重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
rocketmq中ListenerContainerConfiguration的作用是什么,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
创新互联专注于企业营销型网站建设、网站重做改版、藤县网站定制设计、自适应品牌网站建设、HTML5建站、商城网站建设、集团公司官网建设、成都外贸网站建设公司、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为藤县等各大城市提供网站开发制作服务。
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@Configuration public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton { private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class); private ConfigurableApplicationContext applicationContext; private AtomicLong counter = new AtomicLong(0); private StandardEnvironment environment; private RocketMQProperties rocketMQProperties; private ObjectMapper objectMapper; public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper, StandardEnvironment environment, RocketMQProperties rocketMQProperties) { this.objectMapper = rocketMQMessageObjectMapper; this.environment = environment; this.rocketMQProperties = rocketMQProperties; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; } @Override public void afterSingletonsInstantiated() { Mapbeans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); if (Objects.nonNull(beans)) { beans.forEach(this::registerContainer); } } private void registerContainer(String beanName, Object bean) { Class> clazz = AopProxyUtils.ultimateTargetClass(bean); if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); } RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener) bean); container.setObjectMapper(objectMapper); container.setName(name); // REVIEW ME, use the same clientId or multiple? return container; } private void validate(RocketMQMessageListener annotation) { if (annotation.consumeMode() == ConsumeMode.ORDERLY && annotation.messageModel() == MessageModel.BROADCASTING) { throw new BeanDefinitionValidationException( "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!"); } } }
ListenerContainerConfiguration实现了ApplicationContextAware、SmartInitializingSingleton接口
其setApplicationContext方法保存了applicationContext;其afterSingletonsInstantiated方法会获取标注了RocketMQMessageListener注解的bean,然后挨个执行registerContainer
registerContainer方法首先判断该bean是否是RocketMQListener的实现类,不是则抛出IllegalStateException;接着获取RocketMQMessageListener注解的信息,判断是否设置了不支持的属性;之后通过createRocketMQListenerContainer创建DefaultRocketMQListenerContainer并注册到applicationContext,然后对于没有running的container执行start方法
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RocketMQMessageListener { String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}"; String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}"; String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}"; String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}"; /** * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve * load balance. It's required and needs to be globally unique. * * * See here for further discussion. */ String consumerGroup(); /** * Topic name. */ String topic(); /** * Control how to selector message. * * @see SelectorType */ SelectorType selectorType() default SelectorType.TAG; /** * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92} */ String selectorExpression() default "*"; /** * Control consume mode, you can choice receive message concurrently or orderly. */ ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; /** * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice. */ MessageModel messageModel() default MessageModel.CLUSTERING; /** * Max consumer thread number. */ int consumeThreadMax() default 64; /** * Max consumer timeout, default 30s. */ long consumeTimeout() default 30000L; /** * The property of "access-key". */ String accessKey() default ACCESS_KEY_PLACEHOLDER; /** * The property of "secret-key". */ String secretKey() default SECRET_KEY_PLACEHOLDER; /** * Switch flag instance for message trace. */ boolean enableMsgTrace() default true; /** * The name value of message trace topic.If you don't config,you can use the default trace topic name. */ String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER; /** * The property of "name-server". */ String nameServer() default NAME_SERVER_PLACEHOLDER; /** * The property of "access-channel". */ String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER; }
RocketMQMessageListener注解定义了consumerGroup、topic、selectorType、selectorExpression、consumeMode、messageModel、consumeThreadMax、consumeTimeout、accessKey、secretKey、enableMsgTrace、customizedTraceTopic、nameServer、accessChannel属性
ListenerContainerConfiguration实现了ApplicationContextAware、SmartInitializingSingleton接口
其setApplicationContext方法保存了applicationContext;其afterSingletonsInstantiated方法会获取标注了RocketMQMessageListener注解的bean,然后挨个执行registerContainer
registerContainer方法首先判断该bean是否是RocketMQListener的实现类,不是则抛出IllegalStateException;接着获取RocketMQMessageListener注解的信息,判断是否设置了不支持的属性;之后通过createRocketMQListenerContainer创建DefaultRocketMQListenerContainer并注册到applicationContext,然后对于没有running的container执行start方法
看完上述内容,你们掌握rocketmq中ListenerContainerConfiguration的作用是什么的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!