重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

SpringBoot整合RabbitMQ-创新互联

RabbitMQ部署指北
下载镜像

docker pull rabbitmq:3.8-management

让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:域名与空间、雅安服务器托管、营销软件、网站建设、吉利网站维护、网站推广。
执行下面的命令来运行MQ容器:

docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=zhangbo123456* \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \ 
-p 5672:5672 \
-d \
rabbitmq:3.8-management

什么是消息队列

MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

RabbitMQ快速入门

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com

SpringAMQP

1,Basic Queue 简单队列模型

2,Work Queue 工作队列模型

3,发布订阅模型 fanout

4,发布订阅模型 Direct

5,发布订阅模型 Topic

6,消息转换器

概念:

AMQP:是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求

SpringAMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现

AMQP和JMS区别和联系

MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。

两者间的区别和联系:

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式

  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。

  • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

HelloWorld案例

官方的helloword是基于最基础的消息队列模型来实现的,其中包括三个角色

1,publisher:消息发布者,要将消息发布到队列queue

2,queue:消息队列,负责接收并缓存消息

3,consumer:订阅队列,处理队列中的消息

基本消息队列的消息发送流程

1,建立connection

2,创建channel

3,利用channel声名队列

4,利用channel向队列发送消息

基本消息队列的消息接收流程

1,建立connection

2,创建channel

3,利用channel声名队列

4,定义consumer的消费行为handleDelivery

5,利用channel将消费者与队列绑定

快速开始

第一步导入依赖


 org.springframework.boot
  spring-boot-starter-amqp

第二步编写配置文件

spring:
  rabbitmq:
    host: 47.99.139.160  #主机
    port: 5672   #端口号
    virtual-host: /    #虚拟主机
    username: itcast   #用户名
    password: zhangbo123456*   #密码

第三步编写测试方法

@Autowired
RabbitTemplate rabbitTemplate;

@Test
void contextLoads() {
    String queueName = "simple.queue";
    String message = "hello , spring amqp";
    rabbitTemplate.convertAndSend(queueName,message);
}

小注:这个消息不会 创建队列,所以要手动创建队列

第四步在Consumer中编写消费逻辑,监听队列

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimplateQueueMessage(String msg) throws InterruptedException{
        System.out.println("spring消费者接收到消息:"+msg);
    }
}

消息预取限制

修改application.yml,设置preFetch这个值,可以控制预取消息的上线

spring:
rabbitmq:
  host: 47.99.139.160  #主机
  port: 5672   #端口号
  virtual-host: /    #虚拟主机
  username: itcast   #用户名
  password: zhangbo123456*   #密码
  listener:
simple:
  prefetch: 1  #每次只能获取一条消息,处理完成才能获取下一条消息 

发布 订阅

发布订阅模式允许将同一消息发送个多个消费者,实现方式是加入了exchange

常见exchange类型包括

  • Fanout:广播

  • Direct:路由

  • Topic:话题

发布订阅-Fanout Exchange

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue(可以用于实现广播模式)

实现思路:

1,在consumer服务中,利用代码声明队列,交换机,并将两者绑定

2,在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

3,在publisher中编写测试方法,向itcast.fanout发送消息

步骤一 :在consumer服务声名exchange,queue,binding,在consumer服务声名一个配置类,添加@Configuration注解,并声明FanoutExchange,queue和绑定关系对象binding

@Configuration
public class FanoutConfig {

    //声名FanoutChange交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    //声名第一个队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    //绑定队列一和交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1 , FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    //...略,以相同的方式声名第二个队列,并完成绑定
}

consumer代码

//fanout 模式
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueueMessage(String msg) throws InterruptedException{
        System.out.println("spring消费者接收到fanout.queue1消息:"+msg);
    }

    //fanout 模式
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueueMessage2(String msg) throws InterruptedException{
        System.out.println("spring消费者接收到fanout.queue2消息:"+msg);
    }

publisher代码

//fanout 模式
    @Test
    public void testSendFanoutExchange(){
        //交换机名称
        String exchangeName = "itcast.fanout";
        //消息
        String message = "hello , every one";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }

总结:

交换机的作用?

1,接收publisher发送的消息

2,将消息按照路由规则路由到与之绑定的队列

3,不能缓存消息,路由失败,消息丢失

4,FanoutExchange的会将消息路由到每个绑定的队列

声名队列,交换机,绑定关系的bean是什么?

  • queue

  • fanoutExchange

  • Binding

发布订阅-DirectExchange

Direct Exchange会将接收到的消息根据规则路由到指定的queue,因此称之为路由模式(routes)

  • 每一个Queue都与Exchange设置一个BindingKey

  • 发布者发送消息时,指定消息的RoutingKey

  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

案例实现思路

1,利用@RabbitListener声名Exchange,Queue,RoutingKey

2,zaiconsumer服务中,编写两个消费者方法,分别监听direct.queue和direct.queue2

3,在publisher中编写测试方法,向itcast.direct发送消息

consumer

//direct模式
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void listenDirectQueue(String msg){
        System.out.println("spring消费者接收到direct.queue1消息:"+msg);
    }

    //direct模式
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("spring消费者接收到direct.queue2消息:"+msg);
    }

publisher

//direct 模式
    @Test
    public void testSendDirectExchange(){
        //交换机名称
        String exchangeName = "itcast.direct";
        //消息
        String message = "hello , smoky";
        //发送消息  参数分别是:交换机名称 RoutingKey(暂时为空,路由key),消息
        rabbitTemplate.convertAndSend(exchangeName,"smoky",message);
    }

总结:

描述direct交换机和fanout交换机的差异?

fanout交换机将消息发送给每一个与之绑定的队列

directii交换机根据RoutingKey判断路由给那个队列

如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声名队列和交换机有哪些常见注解?

@Queue

@Exchange

发布订阅-TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割

Queue与Exchange指定BIndingKey时可以指定通配符

#:代指0个或多个单词

*:代指一个单词

案例实现思路

1,利用@RabbitListener声名Exchange Queue RoutingKey

2,在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

3,在publisher中编写测试方法,向itcast.topic发送消息

consumer

//topic模式
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue1"),
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
            key = "chain.#"
    ))
    public void listenTopictQueue1(String msg){
        System.out.println("spring消费者接收到topic.queue1消息:"+msg);
    }

    //topic模式
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue2"),
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
            key = "*.news"
    ))
    public void listenTopictQueue2(String msg){
        System.out.println("spring消费者接收到topic.queue2消息:"+msg);
    }

publisher

//direct 模式
    @Test
    public void testSendTopictExchange(){
        //交换机名称
        String exchangeName = "itcast.topic";
        //消息
        String message = "今天天气很好呀";
        //发送消息  参数分别是:交换机名称 RoutingKey(暂时为空,路由key),消息
        rabbitTemplate.convertAndSend(exchangeName,"chain.weather",message);
    }
测试发送Object类型消息,消息转换器

说明:在SpringAMQP的发送方法中,接收到的消息类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送,用的jdk的序列化器

补充: 使用jdk的序列化器的缺点:1,性能比较差 2,安全性不好,容易出现注入的问题 3,数据长度长,占用额外内存

测试代码

//测试Object类型消息
    @Test
    public void sendObjectQueue(){
        Mapmsg = new HashMap<>();
        msg.put("name","柳岩");
        msg.put("age",21);
        rabbitTemplate.convertAndSend("object.queue",msg);
    }

Spring的对消息对象的处理是由import org.springframework.messaging.converter.MessageConverter;来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化

如果要修改只需要定义一个MessageConverter类型的bean即可,推荐使用JSON的方式序列化

引入依赖

 
 com.fasterxml.jackson.core
  jackson-databind
 

声名一个MessageConverter类型的bean  

@Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

consumer

引入依赖


 com.fasterxml.jackson.core
  jackson-databind
 

consumer服务定义MessageConverter

@Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

SpringAMQP中消息的序列化和反序列化是怎么实现的

  • 利用MessageConverter实现的,默认是JDK的序列化

  • 注意发送方接收必须使用相同的MessageConverter

MQ的一些常见问题

1,消息可靠性:如何确保发送的消息至少被消费一次

2,延迟消息问题:如何实现消息的延迟投递

3,高可用问题:如何避免单点的MQ故障而导致的不可用问题

4,消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题

消息可靠性问题

消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?

  • 发送时丢失,

    • 生产者发送的消息未到达exchange

    • 消息到达exchange后未到达queue

  • MQ宕机,queue将消息丢失

  • consumer接收到消息后未消费就宕机

生产者确认机制

RabbitMq提供了publisher confirm机制避免消息发送到MQ的过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功,结果有两种请求

  • publisher-confirm,发送者确认

    • 消息成功投递到交换机返回ack

    • 消息未投递到交换机,返回nack

  • publisher-return

    • 消息投递到交换机了,但是没有路由到队列,返回ACK,及路由失败原因

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同的消息,避免ack冲突

消费者确认

RabbitMQ支持消费者确认机制,即消费者成功处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息,

而SpringAMQP允许配置三种确认模式

  • manual:手动ack,需要在业务代码结束后,调用api发送ack

  • auto:自动ack,由spring检测listener代码是否出现异常,没有异常则返回ack,抛出异常则返回nack

  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后会立即被删除

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


当前名称:SpringBoot整合RabbitMQ-创新互联
文章路径:http://cqcxhl.cn/article/ddehhg.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP