消息中间件
RabbitMQ
使用
导入依赖
生产者
- 新建Connection工厂,设置参数
- 新建Connection连接
- 新建channel
- 声明一个队列
- 使用basicPublish发布消息
1 | package com.oddfar.one; |
- 消费者
- 新建Connection工厂,设置参数
- 新建Connection连接
- 新建channel
- 使用basicConsume发布消息
1 | package com.oddfar.one; |
消息应答(消费者)
1.自动应答,队列发出消息后默认收到,删除消息
2.手动应答,需要消费者在接收消息回调中使用basicACK确认消息,确认后才删除消息
Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
Channel.basicNack(用于否定确认)
Channel.basicReject(用于否定确认)
与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了
持久化
交换机持久化,声明时设置
队列持久化,声明时设置
1 | //让队列持久化 |
消息持久化,发送消息设置参数
不公平分发
给channel设置参数
1 | //不公平分发 |
prefetchCount预取值
仍未ack的个数,到达上限先不给它分配
交换机
创建交换机
1 | Channel channel = RabbitMqUtils.getChannel(); |
类型
1.fanout发送到他绑定的所有队列
2.direct发送到他绑定的指定队列,根据生产者指定的bindingKey路由
1 | channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8")); |
3.topic主要是设置的bindingKey可以为一个单词列表,以点号分隔开,根据生产者指定的bindingKey路由
其中bindingKey
- ***(星号)可以代替一个单词**
- #(井号)可以替代零个或多个单词
死信队列
死信的来源
消息 TTL 过期
TTL是Time To Live的缩写, 也就是生存时间
生产者可以设置TTL属性
1
2
3
4
5
6
7
8
9
10
11Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//设置消息的 TTL 时间 10s
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//该信息是用作演示队列个数限制
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
System.out.println("生产者发送消息:" + message);
}队列达到最大长度
队列满了,无法再添加数据到 mq 中
在创建队列是声明最大长度
消息被拒绝
(basic.reject 或 basic.nack) 并且 requeue=false.
1 | channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); |
创建
和普通交换机和队列一样
多了队列出现死信时应该发到哪个交换机
1 | //声明死信和普通交换机 类型为 direct |
延迟队列
队列设置TTL
过期就扔到死信队列
1
2
3
4
5
6
7
8Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();消息设置TTL
即将投递到消费者之前判定
同时设置使用较小值
插件
可以设置一个延迟交换机
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 好高骛远!