rabbitmq-2-数字孪生牛翰社区-数据算法-牛翰网

rabbitmq-2

springboot/springcloud整合rabbitmq实战
简单例子
生产者
创建项目,引入jar

 <!--springboot整合rabbitmq包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.15</version>
        </dependency>

配置Rabbitmq数据源

spring:
  #spring整合rabbtitmq数据库配置
  rabbitmq:
    username: root
    virtual-host: /root/
    password: tiger
    host: 192.168.170.30
    port: 5672

编写配置交换-队列代码

package com.aaa.sbm.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @FileName: RabbitmqConfig
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:23
 * @Version: 1.0.0
 */
@Configuration //相当于rabbitmq-config.xml  <beans>
public class RabbitmqConfig {

    //定义常量,交换机的名称
    public   static  final String EXCHANGE_NAME="exchange_topic_1";
    //队列名称
    private  static  final String QUEUE_NAME="queue12";

    /**
     * 配置交换机
     * @return
     */
    @Bean
    public Exchange   exchangeTopic(){
         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }


    /**
     *配置队列
     * @return
     */
    @Bean
    public Queue  queue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /**
     * 配置绑定
     * @return
     */
    @Bean
    public Binding  bindExchangeToQueue(){
        //把交换机和队列绑定时一定写绑定key  = info.#
        return  BindingBuilder.bind(queue()).to( exchangeTopic()).with("info.#").noargs();
    }
}

编写发送消息代码

package com.aaa.sbm.test;

import com.aaa.sbm.config.RabbitmqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * @FileName: TestRabbitmqSendMsg
 * @Description: 测试rabbitmq发送消息的类
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:34
 * @Version: 1.0.0
 */
@SpringBootTest
public class TestRabbitmqSendMsg {

    //使用spring支持的设计模式-模版模式
    // spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
    @Resource
    private RabbitTemplate  rabbitTemplate;
    @Test
    public void testSendMsg(){
        //定义发送字符串
        String  sendMsg = "hello.qy178!!!!";
        // 参数1,交换机名称   参数2  routingKey  参数3  发送字符串对象
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"info.test.msg",sendMsg);
    }
}

测试

先打开rabbit服务器,启动rabbitmq服务
 systemctl start rabbitmq-server

消息者
创建项目,引入jar

 <!--springboot整合rabbitmq包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.15</version>
        </dependency>

配置Rabbitmq数据源

spring:
  #spring整合rabbtitmq数据库配置
  rabbitmq:
    username: root
    virtual-host: /root/
    password: tiger
    host: 192.168.170.30
    port: 5672

编写接受消息监听器
复杂例子
需求
生产者有批量定时处理的任务,需要大量部门信息批量向中间件中存储,然后消费要进行消费处理。
生产者

package com.aaa.sbm.task;

import com.aaa.sbm.config.RabbitmqConfig;
import com.aaa.sbm.entity.Dept;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @FileName: BatchScheduledHandlerDeptInfo
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/11 10:30
 * @Version: 1.0.0
 */
@Component
@EnableScheduling  //开启定时功能
public class BatchScheduledHandlerDeptInfo {

    @Resource
    private RabbitTemplate rabbitTemplate;

    //原子Long类型  线程安全的
    private AtomicInteger atomicInteger  = new AtomicInteger();

    /**
     * 每个5秒钟产生20条部门信息,并发送到消息中间中,使用多线程提高发送速度
     */
    // * 秒   *  分   * 时  * 日 * 月 * 周
    //  /  每隔   - 范围   , 枚举
    // */5 * * * * *  每隔5秒执行一次
    // 5-15 * * * * *   任意一分钟的第5秒,第6秒。。。第15分别执行一次
    // 5,10,15 * * * * *  任意一分钟的第5秒,第10秒,第15分别执行一次
    // 0 30  6  01  * *  每月一周6.30执行一次
    @Scheduled(cron = "*/5 * * * * *")
    public  void   handlerDept(){
        //使用线程池的固定线程池大小类型
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        //启动线程
        for (int i = 0; i < 20; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    Integer i = atomicInteger.getAndIncrement();
                    Dept dept = new Dept(i, "开发" + i + "部", "郑州" + i);
                    //使用fastjson把对象转为json发送,方便消费者读取
                    String deptJson = JSON.toJSONString(dept);
                    rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"info.dept",deptJson);
                }
            });
        }
        System.out.println("每隔5秒,发送20个对象到rabbitmq中!!!");

    }
}

消息者

package com.aaa.sbm.util;

import com.aaa.sbm.entity.Dept;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @FileName: RabbitmqConsumerUtil
 * @Description: Rabbitmq消费者工具类,用来监听并处理消息
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:57
 * @Version: 1.0.0
 */
@Component
public class RabbitmqConsumerUtil {


    /**
     * 监听方法
     * @param message
     */
    @RabbitListener(queues = "${queue.name}")    //获取配置文件中配置的队列名称
    public  void    handlerMsg(Message message){
        //底层使用序列化过的字节对象进行传输
        byte[] messageBodyByteArray = message.getBody();
        String deptJson = new String(messageBodyByteArray);
        System.out.println("消费的消息为:"+deptJson);
        Dept dept = JSON.parseObject(deptJson, Dept.class);
        System.out.println("dept对象:"+dept);
    }

}

如何保证消息不丢失
示意图
图片[1]-rabbitmq-2-数字孪生牛翰社区-数据算法-牛翰网
生产者保证消息发送到交换机(confirm机制)
配置
#发布确定类型 none 不开启 不会确定回调 simple 同步 必须等交换机给我发送确认后,再处理业务 correlated 异步 速度快
publisher-confirm-type: correlated
代码实现

package com.aaa.sbm.test;

import com.aaa.sbm.config.RabbitmqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * @FileName: TestRabbitmqSendMsg
 * @Description: 测试rabbitmq发送消息的类
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:34
 * @Version: 1.0.0
 */
@SpringBootTest
public class TestRabbitmqSendMsgComfirm {

    //使用spring支持的设计模式-模版模式
    // spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
    @Resource
    private RabbitTemplate  rabbitTemplate;

   /* @Resource
    private DeptDao deptDao;*/
    @Test
    public void testSendMsg(){
        //deptDao.deleteById(1);
        //定义发送字符串
        String  sendMsg = "hello.qy178!!!!";
        //实例化确认回到接口
        RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback(){
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                // CorrelationData correlationData 相关数据,有一个唯一识别的ID和其他信息, boolean ack, @Nullable String cause
                System.out.println("识别ID:"+(correlationData==null?"":correlationData.getId()));
                //ack应答   如果为true说明消息到交换中  false出错没到
                if(ack){
                    System.out.println("消息已经发送到交换机中!");
                }else {
                    System.out.println("出现错误,错误原因为:"+cause);
                }
            }
        };
        //调用确定回调
        rabbitTemplate.setConfirmCallback(confirmCallback);
        // 参数1,交换机名称   参数2  routingKey  参数3  发送字符串对象    exchange_top_1111
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"info.test.msg",sendMsg);
        // 打印提示
        System.out.println("消息发送完毕!");
    }
}

生产者保证消息发送到队列(returnsCallback机制)
配置
#开启返回回调
publisher-returns: true
代码实现

package com.aaa.sbm.test;

import com.aaa.sbm.config.RabbitmqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * @FileName: TestRabbitmqSendMsg
 * @Description: 测试rabbitmq发送消息的类
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:34
 * @Version: 1.0.0
 */
@SpringBootTest
public class TestRabbitmqSendMsgReturnsCallback {

    //使用spring支持的设计模式-模版模式
    // spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
    @Resource
    private RabbitTemplate  rabbitTemplate;

   /* @Resource
    private DeptDao deptDao;*/
    @Test
    public void testSendMsg(){
        //deptDao.deleteById(1);
        //定义发送字符串
        String  sendMsg = "hello.qy178!!!!";
        RabbitTemplate.ReturnsCallback returnsCallback = new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                System.out.println("确定到队列的信息是:"+new String(returnedMessage.getMessage().getBody()));
                System.out.println("交换机:"+returnedMessage.getExchange());
                System.out.println("路由键:"+returnedMessage.getRoutingKey());
                System.out.println("返回码:"+returnedMessage.getReplyCode());
                System.out.println("返回字符串:"+returnedMessage.getReplyText());
            }
        };
        //发送前,设置消息确定到队列的回调
        rabbitTemplate.setReturnsCallback(returnsCallback);
        // 参数1,交换机名称   参数2  routingKey  参数3  发送字符串对象
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"info.test.msg",sendMsg);
        // 打印提示
        System.out.println("消息发送完毕!");
    }
}

消费者保证消息成功消费后再删除

概述
默认情况下,消息自动确认,消费者一旦拿到消息,确定消费,消费就可以删除了。但是消费者拿到消息后在处理业务过程中,可能出现错误,使用消息没有正确的处理完业务,也属于消息不可靠。这种情况下,开启手动确认消息,写代码完成业务处理成功后再确认消息成功消费,再删除,如果出现错误,不确定消息成功消费,并把消息返回队列。
配置
listener:
simple: #支持1个消费者
# 确认类型
acknowledge-mode: manual #none 禁止使用消息确认 auto 自动确认 manual手动确认
#direct: #支持多个消费者
#acknowledge-mode: manual
代码实现

package com.aaa.sbm.util;

import com.aaa.sbm.entity.Dept;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @FileName: RabbitmqConsumerUtil
 * @Description: Rabbitmq消费者工具类,用来监听并处理消息
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:57
 * @Version: 1.0.0
 */
@Component
public class RabbitmqConsumerUtil {


    /**
     * 监听方法
     * @param message
     */
    /*@RabbitListener(queues = "${queue.name}")    //获取配置文件中配置的队列名称
    public  void    handlerMsg(Message message){
        //底层使用序列化过的字节对象进行传输
        byte[] messageBodyByteArray = message.getBody();
        String deptJson = new String(messageBodyByteArray);
        System.out.println("消费的消息为:"+deptJson);
        Dept dept = JSON.parseObject(deptJson, Dept.class);
        System.out.println("dept对象:"+dept);
    }*/

    /**
     * 接受消息的方法,手动保证数据可靠
     * @param message
     * @param channel
     */
    @RabbitListener(queues = "${queue.name}")    //获取配置文件中配置的队列名称
    public  void    handlerMsg(Message message, Channel channel){
        //获取消息唯一数字标识
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //底层使用序列化过的字节对象进行传输
            byte[] messageBodyByteArray = message.getBody();
            System.out.println("消费的消息为:"+messageBodyByteArray);
            //执行业务  //1 ,2,3,4....100
            System.out.println(1/0);
            //...
            //...
            //basicAck手动确认收到信息的方法
            //long deliveryTag 消息唯一数字标识,
            // boolean multiple   许多的 小于当前deliveryTag所有消息都确认,

            channel.basicAck(deliveryTag,false);
        } catch (Exception e) {
            e.printStackTrace();
            ////basicNack  Nack=Not ack  手动不确认收到信息的方法
            //long deliveryTag 消息唯一数字标识,
            // boolean multiple   许多的 小于当前deliveryTag所有消息都确认
            // boolean requeue  重回队列
            try {
                channel.basicNack(deliveryTag,false,true);
                //channel.basicNack(deliveryTag,false,false);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        }

    }

}

死信队列
概念
死信(Dead Letter),是rabbitmq中的一种机制,因为某种原因造成消息丢失,丢失的消息就称为死信。把死信放入一个队列,不让它丢失,这个队列称为死信队列。通常死信队列会和一个交换机进行绑定,这个交换机称为死信交换机(Dead Letter Exchange),又叫DLX。
原因
图片[2]-rabbitmq-2-数字孪生牛翰社区-数据算法-牛翰网
示意图
图片[3]-rabbitmq-2-数字孪生牛翰社区-数据算法-牛翰网
代码实现

package com.aaa.sbm.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @FileName: RabbitmqDeadLetterQueueConfig
 * @Description: 死信队列配置
 * @Author: zhz
 * @CreateTime: 2024/12/12 9:37
 * @Version: 1.0.0
 */
@Configuration
public class RabbitmqDeadLetterQueueConfig {

    public  static  final  String BUSINESS_EXCHANGE_NAME="businessExchangeA";
    private   static  final  String DEAD_LETTER_EXCHANGE_NAME="deadLetterExchangeA";
    private   static  final  String DEAD_LETTER_QUEUE_NAME="deadLetterQueueA";
    private   static  final  String BUSINESS_QUEUE_NAME="businessQueueA";
    private   static  final  String ROUTING_KEY="deadLetter.info.#";




    /**
     * 配置业务交换机
     * @return
     */
    @Bean
    public Exchange   businessExchange(){
         return ExchangeBuilder.topicExchange(BUSINESS_EXCHANGE_NAME).build();
     }

    /**
     * 配置业务队列
     * @return
     */
    @Bean
    public Queue businessQueue(){
        //实例化参数集合
        Map<String, Object> arguments = new HashMap<>();
        //让当前队列绑定死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);
        arguments.put("x-dead-letter-routing-key",ROUTING_KEY);
        //实例化队列,传递参数
        return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(arguments).build();
    }

    /**
     * 把业务交换机和业务队列绑定
     * @return
     */
    @Bean
    public Binding  businessExchangeToQueueBind(){

        return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(ROUTING_KEY).noargs();
    }

    /**
     * 配置死信交换机
     * @return
     */
    @Bean
    public Exchange   deadLetterExchange(){
        return ExchangeBuilder.topicExchange(DEAD_LETTER_EXCHANGE_NAME).build();
    }

    /**
     * 配置死信队列
     * @return
     */
    @Bean
    public Queue  deadLetterQueue(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE_NAME).build();
    }

    /**
     * 把死信交换机和死信队列绑定
     * @return
     */
    @Bean
    public Binding  deadLetterExchangeToQueueBind(){
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY).noargs();
    }

}

测试
测试:消费者不确认消息也不重回队列
生产者

package com.aaa.sbm.test;

import com.aaa.sbm.config.RabbitmqConfig;
import com.aaa.sbm.config.RabbitmqDeadLetterQueueConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * @FileName: TestRabbitmqSendMsg
 * @Description: 测试rabbitmq发送消息的类
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:34
 * @Version: 1.0.0
 */
@SpringBootTest
public class TestRabbitmqDeadLetterSendMsg {

    //使用spring支持的设计模式-模版模式
    // spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
    @Resource
    private RabbitTemplate  rabbitTemplate;

   /* @Resource
    private DeptDao deptDao;*/
    @Test
    public void testSendMsg(){
        //deptDao.deleteById(1);
        //定义发送字符串
        String  sendMsg = "hello.qy178!!!!";
        // 参数1,交换机名称   参数2  routingKey  参数3  发送字符串对象
        rabbitTemplate.convertAndSend(RabbitmqDeadLetterQueueConfig.BUSINESS_EXCHANGE_NAME,"deadLetter.info.test",sendMsg);
        // 打印提示
        System.out.println("消息发送完毕!");
    }
}

消费者

  //basicNack 不确认     boolean requeue=false  不重回队列
                channel.basicNack(deliveryTag,false,false);

测试:消息超时丢失。
生产者

 //实例化参数集合
        Map<String, Object> arguments = new HashMap<>();
         //设置业务队列存放消息后,消息的过期时间   x-message-ttl =time to live   单位是:毫秒
        arguments.put("x-message-ttl",20000);

消费者
消费者20秒之内一定不要运行

测试:消息超出了队列的存放长度丢失。
生成者
设置长度

  //实例化参数集合
        Map<String, Object> arguments = new HashMap<>();
         //设置业务队列存放消息后,消息的过期时间   x-message-ttl =time to live   单位是:毫秒
       // arguments.put("x-message-ttl",20000);
        //设置业务队列存放消息的最大长度10
        arguments.put("x-max-length",10);

循环发送信息,发送数量大于长度

package com.aaa.sbm.test;

import com.aaa.sbm.config.RabbitmqConfig;
import com.aaa.sbm.config.RabbitmqDeadLetterQueueConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * @FileName: TestRabbitmqSendMsg
 * @Description: 测试rabbitmq发送消息的类
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:34
 * @Version: 1.0.0
 */
@SpringBootTest
public class TestRabbitmqDeadLetterSendMsg {

    //使用spring支持的设计模式-模版模式
    // spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
    @Resource
    private RabbitTemplate  rabbitTemplate;

   /* @Resource
    private DeptDao deptDao;*/
    @Test
    public void testSendMsg(){
        for (int i = 0; i < 15; i++) {
            //deptDao.deleteById(1);
            //定义发送字符串
            String  sendMsg = "hello.qy178!!!!"+i;
            // 参数1,交换机名称   参数2  routingKey  参数3  发送字符串对象
            rabbitTemplate.convertAndSend(RabbitmqDeadLetterQueueConfig.BUSINESS_EXCHANGE_NAME,"deadLetter.info.test",sendMsg);
        }
         // 打印提示
        System.out.println("消息发送完毕!");
    }
}

消费者
消费者不要运行
延迟队列
概念
延迟队列(Delay Queue),生成者生产消息后,不让消费者立马消费,而是过一段时间再让消费者消费,这种机制就是延迟队列。实现方式可以借助死信队列实现(本节就是这么做的),也可以通过插件实现(自己找帖子看下)。
应用场景
电商项目中最常见的订单超时功能
图片[4]-rabbitmq-2-数字孪生牛翰社区-数据算法-牛翰网
会议提醒功能
网站注册后,提醒登录
等等
示意图
图片[5]-rabbitmq-2-数字孪生牛翰社区-数据算法-牛翰网
代码实现
生产者

把死信队列代码中的businessExchange和businessQueue改为delayExchage和DelayQueue并设置DelayQueue的存储消息的过期时间

消费者
监控死信队列即可

集群

来源链接:https://www.cnblogs.com/xiaomubupi/p/18643862

请登录后发表评论

    没有回复内容