RabbitMQ

MQ全称:Message Queue即消息队列,跨进程的通信机制

应用场景:

  • 12306购票排队
  • 秒杀

作用:

  • 流量消峰
    • 在高并发的场景下,所有的请求如果都需要访问数据库,那么会造成数据库连接异常
    • 使用消息队列可以减轻数据库的压力,把请求放到队列中,再慢慢的一个一个的响应,避免数据库的连接异常
    • image-20220726175731617
  • 解耦
    • 如下图,当多个系统进行联系起来的时候可以使用消息队列作为中间的媒介
    • image-20220726190956455
  • 异步

分类

ActiveMQActive积极的活跃的

  • 缺点:吞吐量比较低,也就是每秒处理的个数比较少

Kafka

  • 为大数据而生的中间件,每秒的连接能够达到百万级别,吞吐量最高
  • 主要用于日志的采集

RocketMQ

  • 阿里开发的,用Java实现,但支持的客户端不多,仅支持JavaC++

RabbitMQ

  • 支持语言多、高并发、社区活跃
  • 拥有web端管理界面
  • 拥有各种各样的插件
  • 可以分布式部署

概念

生产者:消息的发送方

消费者:消息的接收方

交换机:用来分发消息,接收生产者发来的消息并推送到队列中

队列:存放的具体的消息,消费者可以再队列中接收消息

生产者和交换机之间的连接称为Channel,交换机和队列之间的连接称为binding key

image-20220728214630121

安装

Erlanghttps://github.com/rabbitmq/erlang-rpm/releases

Socat

yum install socat 

RabbitMQ-serverhttps://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.10.6

开启Web管理页面

rabbitmq-plugins enable rabbitmq_management

开启后,可以访问localhost:15672进行访问,默认账号密码都是guest,这个账户无法通过外网进行登录

需要添加一个用户并赋予管理员角色:

# 添加用户
rabbitmqctl add_user 用户名 密码
# 设置角色
rabbitmqctl set_user_tag 用户名 administrator
# 设置用户权限
rabbitmqctl set_permissions -p "/" 用户名 "配置" "写" "读"
# 设置用户全部权限
rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*"
# 查看所有用户
rabbitmqctl list_users 

开启5672端口

引入依赖:

        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.15.0</version>
        </dependency>

发送消息(生产者)

建立连接:

  • 实例化一个ConnectionFactory 对象
  • setHost("IP地址")设置IP地址
  • setUsername("用户名")设置用户名
  • setPassword("密码")设置密码
  • newConnection()获取一个连接

通过连接创建信道:createChannel()

通过信道创建队列:queueDeclare("队列名称", 持久化, 此连接有效, 自动删除, 参数)

通过信道发送消息:basicPublish("交换机名称", "队列名称", 参数, 消息内容byte[]);

public static void main(String[] args) throws IOException {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    Properties properties = new Properties();
    properties.load(ClassLoader.getSystemResourceAsStream("info.properties"));
    connectionFactory.setHost(properties.getProperty("host"));
    connectionFactory.setUsername(properties.getProperty("username"));
    connectionFactory.setPassword(properties.getProperty("password"));
    try (
            Connection connection = connectionFactory.newConnection()
    ) {
        System.out.println("connection = " + connection);
        // 创建一个信道
        Channel channel = connection.createChannel();
        // 创建一个队列,参数含义:
        // 参数1:队列名称
        // 参数2:是否要进行持久化,默认消息是保存在内存中的
        // 参数3:是否要将这个队列中的消息共享(true为这个队列仅限于当前连接使用)
        // 参数4:如果长时间不使用这个队列,是否要进行删除
        // 参数5:其他参数
        channel.queueDeclare("队列名称", true, false, true, null);
        // 发送消息
        // 参数1:交换机名称
        // 参数2:可以为队列的名称
        // 参数3:其他的参数信息
        // 参数4:消息的内容
        channel.basicPublish("", "队列名称", null, "hello world".getBytes());
    } catch (IOException e) {
        throw new RuntimeException(e);
    } catch (TimeoutException e) {
        throw new RuntimeException(e);
    }
}

接收消息(消费者)

同样也需要进行连接、创建信道

通过信道的basicConsume("队列名称", 是否自动应答, 接收消息的回调, 取消接收消息的回调),只要队列一直存在,将会一直接收消息

    public static void main(String[] args) throws IOException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Properties properties = new Properties();
        properties.load(ClassLoader.getSystemResourceAsStream("info.properties"));
        connectionFactory.setHost(properties.getProperty("host"));
        connectionFactory.setUsername(properties.getProperty("username"));
        connectionFactory.setPassword(properties.getProperty("password"));
        try (
                Connection connection = connectionFactory.newConnection()
        ) {
            Channel channel = connection.createChannel();
            // 参数1:队列名称,参数2:是否自动应答,参数3:接收消息的回调,参数4:取消接收的回调
            channel.basicConsume("队列名称", true, (tag, msg) -> {
//                System.out.println(msg);
                String s = new String(msg.getBody());
                System.out.println("s = " + s);
            }, tag -> {
                System.out.println("用户取消了操作");
            });
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }

    }

工作队列模式

又称为任务队列模式,避免立即执行资源密集型任务。也就是如果有大量消息发送时,将会多个线程一块处理这些消息,也就是一个队列对应多个线程中的消费者,需要注意的是一个消息只能被处理一次

多线程消息轮询

可以创建多个客户端(线程)用来接收同一个队列的消息(这个队列是不自动删除的队列),运行之后可以发现这些客户端将会依次的接收队列中的消息

Deliver中文为传送,读音dəˈlivər

package workqueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import util.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Main {
    public static void main(String[] args) throws IOException, TimeoutException {

        new Thread(() -> {
            try {
                Connection connection = ConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
                DeliverCallback deliverCallback = (tag, msg) -> {
                    System.out.println("第一个线程接收到消息:" + new String(msg.getBody()));
                };

                CancelCallback cancelCallback = (tag) -> {
                    System.out.println("取消接收");
                };
                System.out.println("第一个线程等待接收:");
                channel.basicConsume("队列名称", true, deliverCallback, cancelCallback);
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }
        }).start();
        new Thread(() -> {
            try {
                Connection connection = ConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
                DeliverCallback deliverCallback = (tag, msg) -> {
                    System.out.println("第二个线程接收到消息:" + new String(msg.getBody()));
                };

                CancelCallback cancelCallback = (tag) -> {
                    System.out.println("取消接收");
                };
                System.out.println("第二个线程等待接收:");
                channel.basicConsume("队列名称", true, deliverCallback, cancelCallback);
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }
        }).start();
        new Thread(() -> {
            try {
                Connection connection = ConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
                DeliverCallback deliverCallback = (tag, msg) -> {
                    System.out.println("第三个线程接收到消息:" + new String(msg.getBody()));
                };

                CancelCallback cancelCallback = (tag) -> {
                    System.out.println("取消接收");
                };
                System.out.println("第三个线程等待接收:");
                channel.basicConsume("队列名称", true, deliverCallback, cancelCallback);
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }
        }).start();
    }
}

依次发消息,可以看到:

第一个线程等待接收:
第二个线程等待接收:
第三个线程等待接收:
第一个线程接收到消息:第一条消息
第二个线程接收到消息:第二条消息
第三个线程接收到消息:第三条消息
第一个线程接收到消息:第四条消息
第二个线程接收到消息:第五条消息
第三个线程接收到消息:第六条消息
第一个线程接收到消息:第7条消息
第二个线程接收到消息:第8条消息
第三个线程接收到消息:第9条消息

消息应答

当一个消费者处理某个数据时突然挂掉了,如果发生这样的情况,此时出队列的数据就相当于丢失了,为了解决这个问题,RabbitMQ引入了消息应答机制,也就是消费者处理完成之后向RabbitMQ进行确认,表示当前数据已经处理完了,可以在队列中删除了,如果不进行应答,数据将不会被删除

应答方式

自动应答:需要高吞吐量和安全性做权衡,也就是只要消费者接收到消息后,直接应答,不关心这个消息是否处理完毕

手动应答:

  • 肯定确认:channel.basicAck(标签, 是否批量应答)
  • 否定确认:channel.basicNack()
  • 否定并拒绝:channel.basicReject()

手动应答的好处:可以批量应答并减少网络拥堵

  • 批量应答:
    • 假设channel上正在传输标签:3、4、5、6、7的消息,如果进行肯定确认的为7,那么此时3-7都会被确认,channel.basicAck(7, true)
  • 不批量应答:
    • 假设channel上正在传输标签:3、4、5、6、7的消息,如果进行肯定确认的为7二不进行批量应答,那么此时仅仅有7进行确认应答,channel.basicAck(7, false)

虽然批量应答有很多的好处,但为了避免消息的丢失,还是尽量使用不批量应答,如果一个消息丢失了,那么这个消息将会重新入队,并放到队首

取消自动应答

消费者在接收消息时,都是调用的basicConsume("队列名称", 是否自动应答, 接收消息的回调, 取消接收消息的回调)方法,因此如果要取消自动应答,需要将第二个参数置为false

envelope中文为信封,读音为ˈenvəˌlōp

在传送消息的回调中,message参数里边可以获取到当前传送的消息的id,可以通过这个id进行确认或者否定拒绝某个消息,例如msg.getEnvelope().getDeliveryTag()

肯定确认:

// 参数1:消息的id
// 参数2:是否批量确认
channel.basicAck(msg.getEnvelope().getDeliveryTag(), false);

否定确认:

// 参数1:消息的id
// 参数2:是否是否批量否定
// 参数3:是否重新入队
channel.basicNack(msg.getEnvelope().getDeliveryTag(), false, true);
  • 参数3:为false否定了消息并且不重新入队,下个线程接收其他的新的消息。为true代表否定了消息并重新入队,因为是采用工作队列的方式,当下个线程接收消息时,将会收到这个被否定的消息

否定拒绝确认:

// 参数1:消息的id
// 参数2:是否重新入队
channel.basicReject(msg.getEnvelope().getDeliveryTag(), true);
  • 参数2:为false否定了消息并且不重新入队,下个线程接收其他的新的消息。为true代表否定了消息并重新入队,因为是采用工作队列的方式,当下个线程接收消息时,将会收到这个被否定的消息
  • 相当于否定确认并且不批量否定channel.basicNack(msg.getEnvelope().getDeliveryTag(), false, 是否重新入队);

持久化

表示消息是可以进行持久化存储的

队列的持久化

在队列创建时,可以指定是否持久化

通过信道创建队列:queueDeclare("队列名称", 是否持久化, 此连接有效, 是否自动删除, 参数)

如果创建队列时不进行持久化,那么在重启RabbitMQ之后,这个队列将会丢失

PS C:\Users\singx> net stop RabbitMQ
RabbitMQ 服务正在停止....
RabbitMQ 服务已成功停止。
PS C:\Users\singx> net start RabbitMQ
RabbitMQ 服务正在启动 .
RabbitMQ 服务已经启动成功。

如果一个队列是持久化的,那么在网页端所看到的这个队列的features属性将会显示一个DD代表durable,中文为耐用的,读音为ˈdʊrəbl

image-20220801113611888

如上图所示,第一个队列为持久化的队列,第二个队列不是持久化的队列

队列是否持久化的属性不能在队列创建后修改,否则将会抛出异常,如果实在想要修改,那么需要删除这个队列再重新创建

可以使用web删除队列,也可以用代码控制队列的删除:

channel.queueDelete("队列名称");

消息的持久化

队列的持久化是表示重启后队列不会丢失,但队列中的消息不一定会保存,消息的持久化会保存队列中的消息

在生产者发送消息时,所需的参数:

// 发送消息
// 参数1:交换机名称
// 参数2:可以为队列的名称
// 参数3:其他的参数信息
// 参数4:消息的内容
channel.basicPublish("", "队列名称", null, "hello world".getBytes());

其中参数3可以设置所发送的消息需要持久化保存,可以使用MessageProperties.PERSISTENT_TEXT_PLAIN进行标识这条消息是持久保存的

例如:

channel.basicPublish("", "持久化测试3", MessageProperties.PERSISTENT_TEXT_PLAIN, "消息内容".getBytes());

这样重启RabbitMQ之后,消息将会被保存下来

不公平分发

在多线程轮询的分发消息的方式中,每个线程将会依次的获得消息,但如果此时到了某个线程接收消息了,但这个线程还没有处理完之前的消息,那么此时需要等待这个线程处理完之后将消息传给这个线程后,其他线程才能继续接收处理消息,效率比较低。

为提高效率,本着能者多劳,在一个线程空闲时,可让其接着处理消息,推荐使用这种方式提高效率

在接收消息时,设置信道的basicQos(1)即可:

channel.basicQos(1);

如果值为0就代表不公平分发

例如:

new Thread(() -> {
            try {
                Connection connection = ConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
                channel.basicQos(1);
                DeliverCallback deliverCallback = (tag, msg) -> {
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    System.out.println("第一个线程接收到消息:" + new String(msg.getBody()));
//                    // 手动确认消息
                    channel.basicAck(msg.getEnvelope().getDeliveryTag(), false);
//                    // 否定拒绝
//                    channel.basicNack(msg.getEnvelope().getDeliveryTag(), false, true);
                    // 否定拒绝确认
//                    channel.basicReject(msg.getEnvelope().getDeliveryTag(), true);
                };

                CancelCallback cancelCallback = (tag) -> {
                    System.out.println("取消接收");
                };
                System.out.println("第一个线程等待接收:");
                channel.basicConsume("队列名称", false, deliverCallback, cancelCallback);
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }
        }).start();

发送消息测试结果:

第一个线程等待接收:
第二个线程等待接收:
第三个线程等待接收:
第二个线程接收到消息:2222222222222
第三个线程接收到消息:333333333333333
第二个线程接收到消息:444444444444444
第三个线程接收到消息:555555555555555
第一个线程接收到消息:1111111111111
第二个线程接收到消息:6666666666
第三个线程接收到消息:7777777777777777777
第二个线程接收到消息:9999999999999999999999
第三个线程接收到消息:101010
第一个线程接收到消息:88888888888

预取值

prefetch中文为预取,规定某个线程需要取出的消息次数。

同样也是通过channel.basicQos(数字)进行设置

假设有3个线程,总共8条消息,第一个线程预取值为5,第二个线程预取值为1,第三个线程预取值为2,如果线程按照线程1、线程2、线程3的顺序进行运行,第二个线程必须等第一个线程处理完5个消息后才能进行取出消息,同样的第三个线程需要等第二个线程。满足预取值的前提是队列中的待处理的消息足够多。

例如此时信道8个消息,那么此时完全按照以上的顺序进行,如果信道中十几条消息,并且线程1处理时间比较长,有可能线程1只会处理5条消息,其他的消息线程2可能要比线程3处理的多

发布确认

在队列、队列中的消息进行持久化成功后将会向生产者返回发布确认以代表回应,这个回应就是发布确认,只有进行发布确认之后才能确定生产者发送的消息是绝对不会丢失的

开启发布确认:

channel.confirmSelect();

confirm中文为确认,读音为kənˈfərm

单个发布确认

是一种的同步确认的方式,也就是发布一条消息之后,只有收到确认之后才能够继续发送下一条消息,但发布速度特别慢

通过:channel.waitForConfirms();可以进行发布确认

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.confirmSelect();
long before = System.currentTimeMillis();
channel.queueDelete("持久化测试3");
for (int i = 0; i < 100000; i++) {
    channel.basicPublish("", "持久化测试3", null, ("0.0-消息" + i).getBytes());
    // 确认发布
    channel.waitForConfirms();
}
System.out.println("发送时间:" + (System.currentTimeMillis() - before));

以上代码尝试发送十万条消息入队列,经过测试如果不确认发布,大概需要1000ms左右,如果进行发布确认,大概需要10000ms

批量发布确认

发布一批消息然后再进行确认能够极大的提高吞吐量,但如果某条消息发布失败时不知道是哪个出的问题,发送的指定的条数再进行确认

for (int i = 0; i < 100000; i++) {
            channel.basicPublish("", "持久化测试3", null, ("0.0-消息" + i).getBytes());
            if (i % 1000 == 0) {
                channel.waitForConfirms();
            }
        }

运行时间大概在1300ms左右

异步确认发布

性价比最高,是通过回调来保证一个消息是否发送成功

发送消息时,给每个消息进行编号,生产者无需等待每个消息的确认,只需要直接发就行,如果出现发送失败,失败的消息会自动的通知

需要给信道添加两个事件,调用channel.addConfirmListener(发送成功的回调, 发送失败的回调),这个方法还有一个重载只有一个事件的参数,这个事件仅代表发送成功的回调

两个回调都是ConfirmCallback类型的

long before = System.currentTimeMillis();
channel.queueDelete("持久化测试3");
channel.queueDeclare("持久化测试3", false, false, true, null);
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
    // 发送成功的回调
};
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
    // 发送失败的回调
    System.out.println(deliveryTag + "发送失败");
};
channel.addConfirmListener(ackCallback, nackCallback);
for (int i = 0; i < 100000; i++) {
    channel.basicPublish("", "持久化测试3", null, ("0.0-消息" + i).getBytes());
}
System.out.println("发送时间:" + (System.currentTimeMillis() - before));

处理异步未确认的消息

可以将这些消息放到一个生产者能够访问到的队列,可以使用ConcurrentLinkedHashMap

  • 通过channel.getNextPublishSeqNo()可以获取到将要发送的下一个消息的id
  • 每次发消息可以将消息放到一个Map中,以id为键,消息为值
  • 在发送成功的回调中把相应id的消息删除掉
  • 剩下的消息就是没有发送成功的
channel.confirmSelect();
// 记录所发送的消息
Map<Long, String> record = new ConcurrentSkipListMap<>();
long before = System.currentTimeMillis();
channel.queueDelete("持久化测试3");
channel.queueDeclare("持久化测试3", false, false, true, null);
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
    // 发送成功的回调,删除发送成功的消息
    record.remove(deliveryTag);
};
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
    // 发送失败的回调
    System.out.println(deliveryTag + "发送失败");
};
channel.addConfirmListener(ackCallback, nackCallback);
for (int i = 0; i < 100000; i++) {
    // 获取下一次发送的序号
    record.put(channel.getNextPublishSeqNo(), ("0.0-消息" + i));
    channel.basicPublish("", "持久化测试3", null, ("0.0-消息" + i).getBytes());
}
System.out.println("发送时间:" + (System.currentTimeMillis() - before));

交换机 exchange

如果不指定交换机,将会走默认交换机。在之前的例子中,一个消息只能被消费者消费一次,如果一个消息要被多个消费者消费多次,那么需要使用交换机,这种模式称为发布订阅模式

这种方式的大致按照:生产者->交换机->队列1…队列n->消费者…

生产者将消息发送给交换机,由交换机把这些消息放到队列中

类型:

  • 直接(direct),也被称为路由类型
  • 主题(topic),中文为话题、标题、题目,读音为ˈtɑːpɪk
  • 标题(headers),已经不常用了
  • 扇出(fanout),就是发布订阅模式
  • 无名(exchange),读音为ɪksˈtʃeɪndʒ中文为交换

之前发布消息时的代码:

channel.basicPublish("交换机", "名称", 参数, 消息的内容);

如果交换机的名字是空串表示默认或者无名交换机,第二个参数为routing key,是通过这个绑定队列的

创建:

channel.exchangeDeclare("交换机名称", "交换机类型");

临时队列

临时队列是指断开连接将会消失的队列

临时队列创建方式:

// 可以获取到临时队列名称,为随机的字符串
String name = channel.queueDeclare().getQueue();

绑定

可以把一个队列绑定到交换机

channel.queueBind("队列名称", "交换机名称", "routingKey");

fanout交换机

将收到的所有消息广播到它所知道的所有队列中

绑定并接收消息:

  • 获取连接工厂,并创建信道
  • 通过信道声明一个交换机
  • 通过信道创建一个队列
  • 通过信道将队列绑定到交换机channel.queueBind("队列名称", "交换机名称", "routingKey")
  • 调用信道的channel.basicConsume("队列名称", 接收回调, 取消接收回调)接收消息

发送消息:

  • 发送消息与之前的步骤一致

  • 需要指定发送到的交换机

  • channel.basicPublish("交换机名称", "routingKey", 参数, 消息的byte[]);
    
// 向交换机发送消息
public class SendMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.basicPublish("扇出", "队列222", null, "这是往队列22发送的消息".getBytes());
    }
}

客户端1:

Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare("扇出", "fanout");
String queue2 = channel.queueDeclare().getQueue();
channel.queueBind(queue2, "扇出", "临时队列1");
channel.basicConsume(queue2, (consumerTag, message) -> {
    System.out.println("接收队列1的消息:" + new String(message.getBody()));
}, tag -> {
    System.out.println("取消接收");
});

客户端2:

Channel channel = connection.createChannel();
channel.exchangeDeclare("扇出", "fanout");
String queue2 = channel.queueDeclare().getQueue();
// 客户端2可以不用重复声明交换机,只要是绑定上就可以了
// channel.queueBind(queue2, "扇出", "临时队列2");
channel.basicConsume(queue2, (consumerTag, message) -> {
    System.out.println("接收队列2的消息:" + new String(message.getBody()));
}, tag -> {
    System.out.println("取消接收");
});

向这个交换机任何一个routingKey发送消息,这个时候可以看到这两个客户端的队列中都可以收到消息,交换机可以不用重复声明,只要通过channel.queueBind绑定到了一个交换机就可以

direct交换机

也成为路由模式,同一个交换机中是按照routingKey进行传送消息的

  • 需要声明一个交换机:channel.exchangeDeclare("交换机名称", "direct")
  • 声明队列
  • 队列绑定交换机:channel.queueBind("队列名称", "交换机名称", "routingKey")
  • 发送消息:channel.basicPublish("交换机", "routingKey", 参数, 消息内容);

接收者1:

Channel channel = connection.createChannel();
channel.exchangeDeclare("直接", "direct");
String queue2 = channel.queueDeclare().getQueue();
channel.queueBind(queue2, "直接", "临时队列1");
channel.basicConsume(queue2, (consumerTag, message) -> {
    System.out.println("接收routingKey=临时队列的消息:" + new String(message.getBody()));
}, tag -> {
    System.out.println("取消接收");
});

接收者2:

String queue2 = channel.queueDeclare().getQueue();
channel.queueBind(queue2, "直接", "flag");
channel.basicConsume(queue2, (consumerTag, message) -> {
    System.out.println("接收routingKey=flag的消息:" + new String(message.getBody()));
}, tag -> {
    System.out.println("取消接收");
});

接收者3:

String queue2 = channel.queueDeclare().getQueue();
channel.queueBind(queue2, "直接", "flag");
channel.basicConsume(queue2, (consumerTag, message) -> {
    System.out.println("接收routingKey=flag的消息:" + new String(message.getBody()));
}, tag -> {
    System.out.println("取消接收");
});

如果向routingKey=临时队列1进行发送消息时,只有接收者1能够收到消息,如果向routingKey=flag,接收者2和接收者3都能收到消息,一个队列可以绑定多个routingKey和交换机,只需要多次调用channel.queueBind()即可

topic交换机

主题交换机的routingKey是一个单词列表,并且以.隔开,例如my.rabbit.mq.topic,此种情况下routingKey最长为255字节,其中*可以代表一个单词,#可以代表0个或者多个单词

是通过单词进行匹配队列的

生产者
生产者
exchange
exchange
queue1
queue1
queue3
queue3
queue2
queue2
.first.
.first.

second.#
second.#
..third
..third
Text is not SVG - cannot display

上图中有3个队列,每个队列都有不同的routingKey

  • aa.first.bbb代表匹配queue1
  • second.aaf.af.ads.fad.f.af.adf.adsf.ads.fad.f.das代表匹配queue2
  • second.first.jjjjj代表匹配queue1queue2
  • second.first.third代表匹配queue1queue2queue3

当一个队列绑定的是#,代表这个队列接收所有的消息,这个队列类似于fanout中的队列,当一个队列中没有出现#*时,代表这个队列的绑定类似于direct

针对以上消息队列,代码为:

所有消息的接收者:

channel.exchangeDeclare("主题", "topic");
// # 代表可以收到所有的消息
channel.queueBind(queue, "主题", "#");
channel.basicConsume(queue, (consumerTag, message) -> {
    System.out.println("所有的消息:" + new String(message.getBody()));
}, consumerTag -> {

});

队列1接收者:

String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, "主题", "*.first.*");
channel.basicConsume(queue, (consumerTag, message) -> {
    System.out.println("队列1接收到消息:" + new String(message.getBody()));
}, consumerTag -> {

});

队列2接收者:

Channel channel = connection.createChannel();
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, "主题", "second.#");
channel.basicConsume(queue, (consumerTag, message) -> {
    System.out.println("队列2接收到消息:" + new String(message.getBody()));
}, consumerTag -> {

});

队列3接收者:

String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, "主题", "*.*.third");
channel.basicConsume(queue, (consumerTag, message) -> {
    System.out.println("队列3接收到消息:" + new String(message.getBody()));
}, consumerTag -> {

});

消息发送者:

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicPublish("主题", "second.aaf.af.ads.fad.f.af.adf.adsf.ads.fad.f.das", null, "rerrrrrr".getBytes());

死信队列

死信:无法被消费的消息。

应用场景:下单后未支付的订单在指定的时间过后消失。

可以作为延迟消息处理

来源:

  • 消息TTL(存活时间)过期
  • 队列达到最大长度
  • 消息被拒绝,并且拒绝后的消息不放回到队列中

生产者
生产者
成为死信
成为死信
普通队列
普通队列
交换机
交换机
消费者
消费者
死信交换机
死信交换机
死信队列
死信队列
消费者
消费者
Text is not SVG - cannot display

生产者将消息发送给交换后,如果遇到特殊情况造成死信,将会通过死信交换机将消息放到死信队列

让队列之间建立联系:

  • 在创建队列时,可以看到:

    // 创建一个队列,参数含义:
    // 参数1:队列名称
    // 参数2:是否要进行持久化,默认消息是保存在内存中的
    // 参数3:是否要将这个队列中的消息共享(true为这个队列仅限于当前连接使用)
    // 参数4:如果长时间不使用这个队列,是否要进行删除
    // 参数5:其他参数Map<String, Object>类型
    channel.queueDeclare("队列名称", true, false, true, null);
    
  • 其中最后一个参数可以用来建立两个队列的联系

消费者1:

package dead;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.TimeoutException;

// 作为消费者1(普通的队列)
public class Consumer1 {
    public static final String NORMAL_EXCHANGE = "普通交换机", NORMAL_QUEUE = "普通队列";
    public static final String DEAD_EXCHANGE = "死信交换机", DEAD_QUEUE = "死信队列";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");

        HashMap<String, Object> arguments = new HashMap<>();
//        // 所有消息的超时时间
//        arguments.put("x-message-ttl", 10000);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置死信routingKey
        arguments.put("x-dead-routing-key", "dead");
        // 指定队列的最大长度
        arguments.put("x-max-length", 5);
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");
        Random random = new Random();

        channel.basicConsume(NORMAL_QUEUE, true, (consumerTag, message) -> {
            if (random.nextInt(100) % 2 == 0) {
                System.out.println("模拟拒绝消息....");
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
                return;
            }
            System.out.println("常规队列接收消息:" + new String(message.getBody()));
//            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        }, consumerTag -> {});
    }
}

消费者2(死信):

package dead;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

// 作为消费者2(死信队列)
public class Consumer2 {
    public static final String NORMAL_EXCHANGE = "普通交换机", NORMAL_QUEUE = "普通队列";
    public static final String DEAD_EXCHANGE = "死信交换机", DEAD_QUEUE = "死信队列";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.basicConsume(NORMAL_QUEUE, (consumerTag, message) -> {
            System.out.println("死信队列接收消息:" + new String(message.getBody()));
        }, consumerTag -> {});
    }
}

生产者:

package dead;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

// 生产者
public class Product {
    public static final String NORMAL_EXCHANGE = "普通交换机", NORMAL_QUEUE = "普通队列";
    public static final String DEAD_EXCHANGE = "死信交换机", DEAD_QUEUE = "死信队列";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 设置这条消息超时时间
        AMQP.BasicProperties props = new AMQP.BasicProperties()
                .builder()
                .expiration("10000")
                .build();
        Scanner scanner = new Scanner(System.in);
        while (true) {
            channel.basicPublish(NORMAL_EXCHANGE, "normal", props, scanner.nextLine().getBytes());
        }
    }
}

整合Spring Boot

引入依赖:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

配置:

spring:
  rabbitmq:
    host: IP
    port: 端口
    username: 用户名
    password: 密码

创建队列、创建交换机、绑定交换机队列都需要一个配置类

创建队列:

@Bean
public Queue 方法名() {
    return QueueBuilder
            .nonDurable("队列名称") // nonDurable代表不持久化,durable代表持久化
            .ttl(时间) // 超时时间
            .deadLetterExchange("交换机名称") // 设置死信交换机
            .deadLetterRoutingKey("routing key") // 发往死信交换机的Routing Key
            .build();
}

创建交换机:

// 返回值是各种类型的交换机
@Bean
public DirectExchange 方法名() {
    // 根据需要选择相关重载的构造方法
    return new DirectExchange(NORMAL_EXCHANGE, false, false);
}

绑定交换机:

// 参数可以是用注入队列和交换机
@Bean
public Binding 方法名(@Qualifier("normalQueue1") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {
    return BindingBuilder
            .bind(queue实例)
            .to(exchange实例)
            .with("routing key");
}

接收消息也要放到一个单独的类中,在处理某个队列的方法上添加@RabbitListener注解

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class DeadQueueConsumer {

    @RabbitListener(queues = "队列名称")
    public void receive(Message message) {
        log.info("接收的消息内容:{},现在的时间:{}", new String(message.getBody()), new Date().toString());
    }
}

发送消息只需要在需要的地方声明RabbitTemplate成员变量,进行注入就可以

例如:

private final RabbitTemplate rabbitTemplate;

public void sendToQueue1(String message) {
    rabbitTemplate.convertAndSend("队列名称", "routing key", 消息Object);
    log.info("发送到队列1的消息:{}", message);
}

延迟队列

是死信队列的一种,属于消息的TTL过期的情况

应用场景:

  • 长时间未支付的订单自动取消
  • 在某个时间点发送消息

都是在某个时间点之后执行的任务

案例:

生产者
生产者
普通交换机
普通交换机
队列1
队列1
队列2
队列2
10s
10s
20s
20s
死信交换机
死信交换机
死信队列
死信队列
消费者
消费者
direct
direct
direct
direct
key1
key1
key2
key2
dead
dead
dead
dead
dead
dead
Text is not SVG - cannot display

实现上图的延迟队列

package com.xiaoxu.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    public static final String NORMAL_EXCHANGE = "普通交换机", NORMAL_QUEUE1 = "队列1", NORMAL_QUEUE2 = "队列2";
    public static final String DEAD_EXCHANGE = "死信交换机", DEAD_QUEUE = "死信队列";

    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE, false, false);
    }

    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE, false, false);
    }

    @Bean
    public Queue normalQueue1() {
        return QueueBuilder
                .nonDurable(NORMAL_QUEUE1)
                .ttl(20000)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey("dead")
                .build();
    }

    @Bean
    public Queue normalQueue2() {
        return QueueBuilder
                .nonDurable(NORMAL_QUEUE2)
                .ttl(10000)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey("dead")
                .build();
    }

    @Bean
    public Queue deadQueue() {
        return QueueBuilder
                .nonDurable(DEAD_QUEUE)
                .build();
    }

    @Bean
    public Binding normalQueue1Binding(@Qualifier("normalQueue1") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("key1");
    }

    @Bean
    public Binding normalQueue2Binding(@Qualifier("normalQueue2") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("key2");
    }
    @Bean
    public Binding deadQueueBinding(@Qualifier("deadQueue") Queue queue, @Qualifier("deadExchange") DirectExchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("dead");
    }
}
package com.xiaoxu.rabbitmq.service;

import com.xiaoxu.rabbitmq.config.QueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import java.util.Date;

@Service
@Slf4j
public class SendService {
    private final RabbitTemplate rabbitTemplate;

    public void sendToQueue1(String message) {
        rabbitTemplate.convertAndSend(QueueConfig.NORMAL_EXCHANGE, "key1", message);
        log.info("发送到队列1的消息:{}", message);
    }

    public void sendToQueue2(String message) {
        rabbitTemplate.convertAndSend(QueueConfig.NORMAL_EXCHANGE, "key2", message);
        log.info("发送到队列2的消息:{},现在的时间:{}", message, new Date().toString());
    }

    public SendService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
}
package com.xiaoxu.rabbitmq.service;


import com.xiaoxu.rabbitmq.config.QueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class DeadQueueConsumer {

    @RabbitListener(queues = QueueConfig.DEAD_QUEUE)
    public void receive(Message message) {
        log.info("接收的消息内容:{},现在的时间:{}", new String(message.getBody()), new Date().toString());
    }
}

以上是延迟队列的一个实现,但现在存在着一个问题,当有新的需求时(例如需要一个其他秒数的延迟队列)只能新增加一个队列,因此可以考虑创建一个通用的队列,这个队列不设置延迟,给消息设置延迟,一个消息达到相应的时间后自动放到延迟队列中

设置发消息时的存活时间:

  • 发送时,使用rabbitTemplate.convertAndSend(String, String, Object, MessagePostProcessor)方法

  • 参数1为交换机名称,参数2为routing key,参数3为具体的消息,参数4为一个接口

  • 在这个接口中可以通过Message类型的参数获取到这个消息的属性,通过属性设置时间,最后再返回这个消息

  • rabbitTemplate.convertAndSend("交换机名称", "routing key", 消息, msg -> {
        msg.getMessageProperties().setExpiration("时间");
        return msg;
    });
    

使用这种方式是有缺点的,当队列中有多条消息,每条消息的时间都不同,那么将会以第一个消息的时间为准,即使第二条消息比第一条延迟短。因为RabbitMQ只会检查当前队首的第一条消息是否过期,如果第一条消息的延时很长,第二条消息的延时很短,第二条消息并不会优先执行

使用插件实现延迟队列

下载插件rabbitmq_delayed_message_exchange,下载.ez结尾的文件

将文件移动到:

# linux
/usr/lib/rabbitmq/lib/rabbitmq_server-3.10.6/plugins

# Windows
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.10.6\plugins

执行:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange           

启用插件,然后重启RabbitMQ

如果安装成功了,进入web管理页面新建交换机的类型里边将会包含一个 x-delayed-message类型的交换机,表明延迟队列将会由交换机进行

延迟交换机的配置

delayed中文为延迟,读音为dɪˈleɪd

在这种交换机中,消息发送后并不会直接放到队列中,而是放到一个表中,等到时间到了再投递到队列中

  • 需要创建交换机,指定交换机的类型为x-delayed-message

    • 方法的返回值设置为CustomExchange

      • @Bean
        public CustomExchange delayExchange() {
            HashMap<String, Object> arguments = new HashMap<>();
            // 设置路由类型
            arguments.put("x-delayed-type", "direct");
            return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, arguments);
        }
        
    • 交换机的延迟类型为之前交换机的一种,目的是设置匹配队列的方式

  • 创建普通的队列,使用一个routing key绑定交换机

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

@Configuration
public class DelayedConfig {
    public static final String DELAYED_EXCHANGE = "延迟交换机", DELAYED_QUEUE = "延迟队列";
    public static final String ROUTING_KEY = "KEYS";

    @Bean
    public CustomExchange delayExchange() {
        HashMap<String, Object> arguments = new HashMap<>();
        // 设置路由类型
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, arguments);
    }

    @Bean
    public Queue delayQueue() {
        return QueueBuilder
                .nonDurable(DELAYED_QUEUE)
                .build();
    }

    @Bean
    public Binding delayQueueBinding(@Qualifier("delayExchange") CustomExchange exchange, @Qualifier("delayQueue") Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ROUTING_KEY)
                .noargs();
    }
}

发送消息时,不再使用setExpiration(时间)设置延迟的时间,而是使用setDelayed(时间)进行设置

public void sendToDelayedQueue(String message, Integer time) {
    log.info("消息内容为:{},发到延迟队列的延迟为:{}", message, time);
    rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, DelayedConfig.ROUTING_KEY, message, msg -> {
        msg.getMessageProperties().setDelay(time);
        return msg;
    });
}

发布确认(高级)

解决当RabbitMQ宕机之后发过来的消息处理

解决交换机收不到消息的处理方案

每次发送到交换机的消息,交换机都会使用回调接口

  • 需要在配置文件中,设置:

    spring:
      rabbitmq:
        publisher-confirm-type: correlated
    

    correlated中文为相关,读音为ˈkɔːrəleɪtɪd

    • 这个配置项的值包括:
      • none:默认值
      • correlated:发送消息到交换机后回调方法
      • simple:有两个效果,其中一个效果与correlated效果一致,另一个效果相当于单步确认
  • 自定义类实现:RabbitTemplate.ConfirmCallback接口,并在这个类中添加RabbitTemplate成员变量,并在构造器中注入,注入后,调用RabbitTemplate中的setConfirmCallback(this)方法,设置回调对象为当前类的实例

    • 需要实现这个接口的confirm(CorrelationData correlationData, boolean ack, String cause)方法
      • 参数1为消息对应的实例
      • 参数2为是否被交换机接受
      • 参数3为原因
  • import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class MyExchangeCallback implements RabbitTemplate.ConfirmCallback {
        private RabbitTemplate template;
    
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                log.info("交换机收到消息...,尝试查看消息的内容:{}", correlationData);
            } else {
                ReturnedMessage returned = correlationData.getReturned();
                log.info("交换机没有收到消息, 原因:{},消息内容:{},发往的交换机:{},routingKey:{}", cause, new String(returned.getMessage().getBody()),
                        returned.getExchange(), returned.getRoutingKey());
            }
        }
    
        public MyExchangeCallback(RabbitTemplate template) {
            this.template = template;
            template.setConfirmCallback(this);
        }
    }
    

这个时候可以发现,无论消息发送成功或者失败,第一个参数correlationData一直是null,这是因为在发送消息时需要手动填写这个参数

public void sendToOne(String message) {
    log.info("生产者发到交换机的消息:{}", message);
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    correlationData.setReturned(new ReturnedMessage(new Message(message.getBytes()), 404, "",
            "一个交换机", "一个key"));
    rabbitTemplate.convertAndSend("一个交换机", "一个key", message, correlationData);
}

回退消息

以上仅针对交换机的处理方案,如果交换机发送到队列时出现了问题或者routingKey出了问题,这个时候不会给出失败的提示。

回退消息:如果消息在发往队列时出现了问题,可以把出现问题的消息会退给生产者

  • 配置文件中设置:

    • spring:
        rabbitmq:
          publisher-returns: true
      
  • 自定义类实现RabbitTemplate.ReturnsCallback接口:

    • @Component
      @Slf4j
      public class MyMessageReturnCallback implements RabbitTemplate.ReturnsCallback {
          private RabbitTemplate template;
      
          public MyMessageReturnCallback(RabbitTemplate template) {
              this.template = template;
              template.setReturnsCallback(this);
          }
      
          @Override
          public void returnedMessage(ReturnedMessage returned) {
              log.error("回退消息,回退的内容为:{},所在交换机:{},routingKey:{},reply:{}", new String(returned.getMessage().getBody()),
                      returned.getExchange(), returned.getRoutingKey(), returned.getReplyText());
          }
      }
      

优先级队列

按照队列中的优先级进行取出消息,优先级越大,越先处理,范围1-255,官方文档建议1-10

@Bean
public Queue priority() {
    return QueueBuilder
            .nonDurable("priority")
            .maxPriority(10)
            .build();
}

以上代码可以设置队列最大的优先级

发送消息时也需要设置优先级第7行:

public void sendToOne(String message, int priority) {
    log.info("生产者发到交换机的消息:{}", message);
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    correlationData.setReturned(new ReturnedMessage(new Message(message.getBytes()), 404, "",
            "一个交换机", "priority"));
    rabbitTemplate.convertAndSend("一个交换机", "priority", message, msg -> {
        msg.getMessageProperties().setPriority(priority);
        return msg;
    }, correlationData);
}

惰性队列

一般情况下,消息是存放到内存中的,而惰性队列是消息存放到磁盘中的。惰性队列消费速度比较慢,当消息堆积的比较多时,可以考虑使用惰性队列

@Bean
public Queue priority() {
    return QueueBuilder
            .nonDurable("priority")
            .maxPriority(10)
            .lazy() // 惰性队列
            .build();
}

Q.E.D.


念念不忘,必有回响。