重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
一、阿里云官网---帮助文档
创新互联主要从事成都网站设计、网站制作、网页设计、企业做网站、公司建网站等业务。立足成都服务疏附,10余年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:18980820575
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh
按照官网步骤,创建Topic、申请发布(生产者)、申请订阅(消费者)
二、代码
1、配置:
public class MqConfig { /** * 启动测试之前请替换如下 XXX 为您的配置 */ public static final String PUBLIC_TOPIC = "test";//公网测试 public static final String PUBLIC_PRODUCER_ID = "PID_SCHEDULER"; public static final String PUBLIC_CONSUMER_ID = "CID_SERVICE"; public static final String ACCESS_KEY = "123"; public static final String SECRET_KEY = "123"; public static final String TAG = ""; public static final String THREAD_NUM = "25";//消费端线程数 /** * ONSADDR 请根据不同Region进行配置 * 公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet * 公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal */ public static final String ONSADDR = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"; }
ONSADDR 阿里云用 公有云生产,测试用公网
不同的业务可以设置不同的tag,但是如果发送消息量大的话,建议新建TOPIC
2、生产者
方式1:
配置文件:producer.xml
<?xml version="1.0" encoding="UTF-8"?>
启动方式1,在使用类的全局里设置:
//初始化生产者 private ApplicationContext ctx; private ProducerBean producer; @Value("${producerConfig.enabled}")//开关,spring配置项,true为开启,false关闭 private boolean producerConfigEnabled; @PostConstruct public void init(){ if (true == producerConfigEnabled) { ctx = new ClassPathXmlApplicationContext("producer.xml"); producer = (ProducerBean) ctx.getBean("producer"); } }
PS:最近发现一个坑,如果producer用上面这种方式启动的话,一旦启动的多了,会造成fullGC,所以可以换成下面这种注解方式启动,在用到的地方手动start、shutdown
方式2:配置类(不需要xml)
@Configuration public class ProducerBeanConfig { @Value("${openservices.ons.producerBean.producerId}") private String producerId; @Value("${openservices.ons.producerBean.accessKey}") private String accessKey; @Value("${openservices.ons.producerBean.secretKey}") private String secretKey; private ProducerBean producerBean; @Value("${openservices.ons.producerBean.ONSAddr}") private String ONSAddr; @Bean public ProducerBean oneProducer() { ProducerBean producerBean = new ProducerBean(); Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.ProducerId, producerId); properties.setProperty(PropertyKeyConst.AccessKey, accessKey); properties.setProperty(PropertyKeyConst.SecretKey, secretKey); properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr); producerBean.setProperties(properties); return producerBean; } }
PS:经过这次双11发现,以上2种方式在大数据量,多线程情况下都不太适用, 性能很差,所以推荐用3
方式3:(不需要xml)
@Component public class ProducerBeanSingleTon { @Value("${openservices.ons.producerBean.producerId}") private String producerId; @Value("${openservices.ons.producerBean.accessKey}") private String accessKey; @Value("${openservices.ons.producerBean.secretKey}") private String secretKey; @Value("${openservices.ons.producerBean.ONSAddr}") private String ONSAddr; private static Producer producer; private static class SingletonHolder { private static final ProducerBeanSingleTon INSTANCE = new ProducerBeanSingleTon(); } private ProducerBeanSingleTon (){} public static final ProducerBeanSingleTon getInstance() { return SingletonHolder.INSTANCE; } @PostConstruct public void init(){ // producer 实例配置初始化 Properties properties = new Properties(); //您在控制台创建的Producer ID properties.setProperty(PropertyKeyConst.ProducerId, producerId); // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.setProperty(PropertyKeyConst.AccessKey, accessKey); // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.setProperty(PropertyKeyConst.SecretKey, secretKey); //设置发送超时时间,单位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // 设置 TCP 接入域名(此处以公共云生产环境为例) properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr); producer = ONSFactory.createProducer(properties); // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可 producer.start(); } public Producer getProducer(){ return producer; } }
spring配置
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect consumerConfig.enabled = true producerConfig.enabled = true #方式1: scheduling.enabled = false #方式2、3:rocketMQ \u516C\u7F51\u914D\u7F6E openservices.ons.producerBean.producerId = pid openservices.ons.producerBean.accessKey = openservices.ons.producerBean.secretKey = openservices.ons.producerBean.ONSAddr = 公网、杭州公有云生产
方式1投递消息代码:
try { String jsonC = JsonUtils.toJson(elevenMessage); Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes()); SendResult sendResult = producer.send(message); if (sendResult != null) { logger.info(".Send mq message success!”; } else { logger.warn(".sendResult is null........."); } } catch (Exception e) { logger.warn("DoubleElevenAllPreService"); Thread.sleep(1000);//如果有异常,休眠1秒 }
方式2投递消息代码:(可以每发1000个启动/关闭一次)
producerBean.start(); try { String jsonC = JsonUtils.toJson(elevenMessage); Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes()); SendResult sendResult = producer.send(message); if (sendResult != null) { logger.info(".Send mq message success!”; } else { logger.warn(".sendResult is null........."); } } catch (Exception e) { logger.warn("DoubleElevenAllPreService"); Thread.sleep(1000);//如果有异常,休眠1秒 } producerBean.shutdown();
方式3:投递消息
try { String jsonC = JsonUtils.toJson(elevenMessage); Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes()); Producer producer = ProducerBeanSingleTon.getInstance().getProducer(); SendResult sendResult = producer.send(message); if (sendResult != null) { logger.info("DoubleElevenMidService.Send mq message success! Topic is:"”; } else { logger.warn("DoubleElevenMidService.sendResult is null........."); } } catch (Exception e) { logger.error("DoubleElevenMidService Thread.sleep 1 s___error is "+e.getMessage(), e); Thread.sleep(1000);//如果有异常,休眠1秒 }
发送消息的代码一定要捕获异常,不然会重复发送。
这里的TOPIC用自己创建的,elevenMessage是要发送的内容,我这里是自己建的对象
3、消费者
配置启动类:
@Configuration @ConditionalOnProperty(value = "consumerConfig.enabled", havingValue = "true", matchIfMissing = true) public class ConsumerConfig { private Logger logger = LoggerFactory.getLogger(LoggerAppenderType.smsdist.name()); @Bean public Consumer consumerFactory(){//不同消费者 这里不能重名 Properties consumerProperties = new Properties(); consumerProperties.setProperty(PropertyKeyConst.ConsumerId, MqConfig.CONSUMER_ID); consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY); consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY); //consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums,MqConfig.THREAD_NUM); consumerProperties.setProperty(PropertyKeyConst.ONSAddr, MqConfig.ONSADDR); Consumer consumer = ONSFactory.createConsumer(consumerProperties); consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new DoubleElevenMessageListener());//new对应的监听器 consumer.start(); logger.info("ConsumerConfig start success."); return consumer; } }
CID和ONSADDR一点要选对,用自己的,消费者线程数等可以在这里配置
创建消息监听器类,消费消息:
@Component public class MessageListener implements MessageListener { private Logger logger = LoggerFactory.getLogger("remind"); protected static ElevenReposity elevenReposity; @Resource public void setElevenReposity(ElevenReposity elevenReposity){ MessageListener .elevenReposity=elevenReposity; } @Override public Action consume(Message message, ConsumeContext consumeContext) { if(message.getTopic().equals("自己的TOPIC")){//避免消费到其他消息 json转换报错 try { byte[] body = message.getBody(); String res = new String(body); //res 是生产者传过来的消息内容 //业务代码 }else{ logger.warn("!"); } } catch (Exception e) { logger.error("MessageListener.consume error:" + e.getMessage(), e); } logger.info("MessageListener.Receive message”); //如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater return Action.CommitMessage; }else{ logger.warn(); return Action.ReconsumeLater; } }
注意,由于消费者是多线程的,所以对象要用static+set注入,把对象的级别提升到进程,这样多个线程就可以共用,但是无法调用父类的方法和变量
消费者状态可以查看消费者是否连接成功,消费是否延迟,消费速度等
重置消费位点可以清空所有消息
三、注意事项
1、发送的消息体 最大为256KB
2、消息最多存在3天
3、消费端默认线程数是20
4、如果运行过程中出现java挂掉或者cpu占用异常高,可以在发送消息的时候,每发送1000条让线程休息1s
5、本地测试或启动的时候,把ONSADDR换成公网,不然报错无法启动
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持创新互联。