RocketMQ · 官方网站 | RocketMQ

简介

RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  1. 能够保证严格的消息顺序
  2. 提供丰富的消息拉取模式
  3. 高效的订阅者水平扩展能力
  4. 实时的消息订阅机制
  5. 亿级消息堆积能力

为什么要使用MQ

  1. 要做到系统解耦,当新的模块进来时,可以做到代码改动最小; 能够解耦
  2. 设置流程缓冲池,可以让后端系统按自身吞吐能力进行消费,不被冲垮; 能够削峰,限流
  3. 强弱依赖梳理能把非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步

Mq的作用 削峰限流 异步 解耦合

定义

是指利用高效可靠的消息传递机制进行与平台无关(跨平台)的数据交流,并基于数据通信来进行分布式系统的集成

通过提供消息传递和消息排队模型在分布式环境下提供应用解耦,弹性伸缩,冗余存储,流量削峰,异步通信,数据同步等

大致流程如下

发送者把消息发给消息服务器,消息服务器把消息存放在若干队列/主题中,在合适的时候,消息服务器会把消息转发给接受者。在这个过程中,发送和接受是异步的,也就是发送无需等待,发送者和接受者的生命周期也没有必然关系在发布pub/订阅sub模式下,也可以完成一对多的通信,可以让一个消息有多个接受者

特点

异步处理模式

消息发送者可以发送一个消息而无需等待响应。消息发送者把消息发送到一条虚拟的通道(主题或队列)上

消息接收者则订阅或监听该通道。一条信息可能最终转发给一个或多个消息接收者,这些接收者都无需对消息发送者做出回应。整个过程都是异步的

应用系统的解耦

发送者和接收者不必了解对方,只需要确认消息

发送者和接收者不必同时在线

各个MQ产品的比较

RocketMQ重要概念

Producer:消息的发送者,生产者;举例:发件人

Consumer:消息接收者,消费者;举例:收件人

Broker:暂存和传输消息的通道;举例:快递

NameServer:管理Broker;举例:各个快递公司的管理机构,相当于broker的注册中心,保留了broker的信息

Queue:队列,消息存放的位置,一个Broker中可以有多个队列

Topic:主题,消息的分类

ProducerGroup:生产者组

ConsumerGroup:消费者组,多个消费者组可以同时消费一个主题的消息

消息发送的流程是,Producer询问NameServer,NameServer分配一个broker 然后Consumer也要询问NameServer,得到一个具体的broker,然后消费消息

生产和消费

RocketMQ安装

下载RocketMQ

下载地址:https://rocketmq.apache.org/dowloading/releases/

上传服务器

在root目录下创建文件夹

1
mkdir rocketmq

解压

1
unzip rocketmq-all-4.9.2-bin-release.zip

配置环境变量

1
sudo vim /etc/profile

在文件末尾添加

1
export NAMESRV_ADDR=公网IP:9876

使配置生效

1
source /etc/profile

修改nameServer的运行脚本

进入bin目录下,修改runserver.sh文件,将71行和76行的Xms和Xmx等改小一点

1
vim runserver.sh

保存退出

修改broker的运行脚本

进入bin目录下,修改runbroker.sh文件,修改67行

修改broker的配置文件

进入conf目录下,修改broker.conf文件

在文件末尾添加三行

1
2
3
namesrvAddr=localhost:9876
autoCreateTopicEnable=true
brokerIP1=公网IP
  • namesrvAddr:nameSrv地址 可以写localhost因为nameSrv和broker在一个服务器
  • autoCreateTopicEnable:自动创建主题,不然需要手动创建出来
  • brokerIP1:broker需要一个公网ip

启动

在目录下创建一个logs文件夹,用于存放日志

1
mkdir logs

启动nameSrv

1
nohup sh bin/mqnamesrv > ./logs/namesrv.log &

启动broker 这里的-c是指定使用的配置文件

1
nohup sh bin/mqbroker -c conf/broker.conf > ./logs/broker.log &
  • nohup:确保命令在终端关闭后仍然继续运行

输入jps查看启动结果

安装RocketMQ-Console(可选)

可视化平台dashboard

网址: https://github.com/apache/rocketmq-dashboard

下载后解压出来unzip

1
2
3
mvn clean package -Dmaven.test.skip=true
cd target
nohup java -jar ./rocketmq-dashboard-1.0.0.jar --server,port:8001 rocketmq.config.namesrvAddr=127.0.0.1:9876 > dashboard.log &
  • –server.port指定运行的端口
  • –rocketmq.config.namesrvAddr=127.0.0.1:9876 指定namesrv地址

RocketMQ快速入门

消息发送和监听的流程

消息生产者

  1. 创建消息生产者producer,并制定生产者组名
  2. 指定Nameserver地址
  3. 启动producer
  4. 创建消息对象,指定主题Topic、Tag和消息体等
  5. 发送消息
  6. 关闭生产者producer

消息消费者

  1. 创建消费者consumer,制定消费者组名
  2. 指定Nameserver地址
  3. 创建监听订阅主题Topic和Tag等
  4. 处理消息
  5. 启动消费者consumer

搭建

加入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
<version>4.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>

编写生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 测试生产者
*
* @throws Exception
*/
@Test
public void testProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息
// 第一个参数:主题的名字
// 第二个参数:消息内容
Message msg = new Message("TopicTest", ("Hello RocketMQ " + i).getBytes());
SendResult send = producer.send(msg);
System.out.println(send);
}
// 关闭实例
producer.shutdown();
}

编写消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   /**
* 测试消费者
*
* @throws Exception
*/
@Test
public void testConsumer() throws Exception {
// 创建默认消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
// 设置nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息
consumer.subscribe("TopicTest", "*");
// 注册一个消费监听 MessageListenerConcurrently 是多线程消费,默认20个线程,可以参看consumer.setConsumeThreadMax()
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + "----" + msgs);
// 返回消费的状态 如果是CONSUME_SUCCESS 则成功,若为RECONSUME_LATER则该条消息会被重回队列,重新被投递
// 重试的时间为messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// 也就是第一次1s 第二次5s 第三次10s .... 如果重试了18次 那么这个消息就会被终止发送给消费者
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 这个start一定要写在registerMessageListener下面
consumer.start();
System.in.read();
}

测试

启动生产者和消费者进行测试

消费模式

MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull

Push是服务端【MQ】主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃

Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时

Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式

不管是push还是pull都是pull模式,通过长轮询实现

发送同步消息

发送同步消息,发送过后会有一个返回值,也就是mq服务器接收到消息后返回的一个确认,这种方式非常安全,但是性能上并没有这么高,而且在mq集群中,也是要等到所有的从机都复制了消息以后才会返回,所以针对重要的消息可以选择这种方式

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知

异步消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void testAsyncProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
Message msg = new Message("TopicTest", ("异步消息").getBytes());
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败");
}
});
System.out.println("看看谁先执行");
// 挂起jvm 因为回调是异步的不然测试不出来
System.in.read();
// 关闭实例
producer.shutdown();
}

异步消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void testAsyncConsumer() throws Exception {
// 创建默认消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
// 设置nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息
consumer.subscribe("TopicTest", "*");
// 注册一个消费监听 MessageListenerConcurrently是并发消费
// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 这里执行消费的代码 默认是多线程消费
System.out.println(Thread.currentThread().getName() + "----" + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}

发送单向消息

这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送

单向消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void testOnewayProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
Message msg = new Message("TopicTest", ("单向消息").getBytes());
// 发送单向消息
producer.sendOneway(msg);
// 关闭实例
producer.shutdown();
}

单向消息消费者

消费者和上面一样

发送延迟消息

消息放入mq后,过一段时间,才会被监听到,然后消费

比如下订单业务,提交了一个订单就可以发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存

延迟消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void testDelayProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
Message msg = new Message("TopicTest", ("延迟消息").getBytes());
// 给这个消息设定一个延迟等级
// messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3);
// 发送单向消息
producer.send(msg);
// 打印时间
System.out.println(new Date());
// 关闭实例
producer.shutdown();
}

延迟消息消费者

消费者和上面一样

这里注意的是RocketMQ不支持任意时间的延时

只支持以下几个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

发送顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为:分区有序或者全局有序。

可能大家会有疑问,mq不就是FIFO吗?

rocketMq的broker的机制,导致了rocketMq会有这个问题

因为一个broker中对应了四个queue

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取(单线程),则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的

下面用订单进行分区有序的示例。一个订单的顺序流程是:下订单、发短信通知、物流、签收。订单顺序号相同的消息会被先后发送到同一个队列中,消费时,同一个顺序获取到的肯定是同一个队列

场景分析

模拟一个订单的发送流程,创建两个订单,发送的消息分别是

订单号111 消息流程 下订单->物流->签收

订单号112 消息流程 下订单->物流->拒收

创建一个订单对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
/**
* 订单id
*/
private Integer orderId;

/**
* 订单编号
*/
private Integer orderNumber;

/**
* 订单价格
*/
private Double price;

/**
* 订单号创建时间
*/
private Date createTime;

/**
* 订单描述
*/
private String desc;
}

顺序消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Test
public void testOrderlyProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
List<Order> orderList = Arrays.asList(
new Order(1, 111, 59D, new Date(), "下订单"),
new Order(2, 111, 59D, new Date(), "物流"),
new Order(3, 111, 59D, new Date(), "签收"),
new Order(4, 112, 89D, new Date(), "下订单"),
new Order(5, 112, 89D, new Date(), "物流"),
new Order(6, 112, 89D, new Date(), "拒收")
);
// 循环集合开始发送
orderList.forEach(order -> {
Message message = new Message("TopicTest", order.toString().getBytes());
try {
// 发送的时候 相同的订单号选择同一个队列
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 当前主题有多少个队列
int queueNumber = mqs.size();
// 这个arg就是后面传入的 order.getOrderNumber()
Integer i = (Integer) arg;
// 用这个值去%队列的个数得到一个队列
int index = i % queueNumber;
// 返回选择的这个队列即可 ,那么相同的订单号 就会被放在相同的队列里 实现FIFO了
return mqs.get(index);
}
}, order.getOrderNumber());
} catch (Exception e) {
System.out.println("发送异常");
}
});
// 关闭实例
producer.shutdown();
}

顺序消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void testOrderlyConsumer() throws Exception {
// 创建默认消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
// 设置nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息
consumer.subscribe("TopicTest", "*");
// 注册一个消费监听 MessageListenerOrderly 是顺序消费 单线程消费
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
MessageExt messageExt = msgs.get(0);
System.out.println(new String(messageExt.getBody()));
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.in.read();
}

发送批量消息

Rocketmq可以一次性发送一组消息,那么这一组消息会被当做一个消息消费

批量消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void testBatchProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
List<Message> msgs = Arrays.asList(
new Message("TopicTest", "我是一组消息的A消息".getBytes()),
new Message("TopicTest", "我是一组消息的B消息".getBytes()),
new Message("TopicTest", "我是一组消息的C消息".getBytes())

);
SendResult send = producer.send(msgs);
System.out.println(send);
// 关闭实例
producer.shutdown();
}

批量消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void testBatchConsumer() throws Exception {
// 创建默认消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
// 设置nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来消费 表达式,默认是*
consumer.subscribe("TopicTest", "*");
// 注册一个消费监听 MessageListenerConcurrently是并发消费
// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 这里执行消费的代码 默认是多线程消费
System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}

发送事务消息

事务消息的发送流程

它可以被认为是一个两阶段的提交消息实现,以确保分布式系统的最终一致性。事务性消息确保本地事务的执行和消息的发送可以原子地执行

正常事务消息的发送及提交、事务消息的补偿流程

事务消息发送及提交

  1. 发送消息(half消息)
  2. 服务端响应消息写入结果
  3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
  4. 根据本地事务状态执行Commit或Rollback(Commit操作生成消息索引,消息对消费者可见)

事务补偿

  1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
  2. Producer收到回查消息,检查回查消息对应的本地事务的状态
  3. 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息UNKNOW或者Rollback发生超时或者失败的情况

事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态

事务消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* TransactionalMessageCheckService的检测频率默认1分钟,可通过在broker.conf文件中设置transactionCheckInterval的值来改变默认值,单位为毫秒。
* 从broker配置文件中获取transactionTimeOut参数值。
* 从broker配置文件中获取transactionCheckMax参数值,表示事务的最大检测次数,如果超过检测次数,消息会默认为丢弃,即回滚消息。
*
* @throws Exception
*/
@Test
public void testTransactionProducer() throws Exception {
// 创建一个事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("test-group");
producer.setNamesrvAddr("localhost:9876");
// 设置事务消息监听器
producer.setTransactionListener(new TransactionListener() {
// 这个是执行本地业务方法
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println(new Date());
System.out.println(new String(msg.getBody()));
// 这个可以使用try catch对业务代码进行性包裹
// COMMIT_MESSAGE 表示允许消费者消费该消息
// ROLLBACK_MESSAGE 表示该消息将被删除,不允许消费
// UNKNOW表示需要MQ回查才能确定状态 那么过一会 代码会走下面的checkLocalTransaction(msg)方法
return LocalTransactionState.UNKNOW;
}

// 这里是回查方法 回查不是再次执行业务操作,而是确认上面的操作是否有结果
// 默认是1min回查 默认回查15次 超过次数则丢弃打印日志 可以通过参数设置
// transactionTimeOut 超时时间
// transactionCheckMax 最大回查次数
// transactionCheckInterval 回查间隔时间单位毫秒
// 触发条件
// 1.当上面执行本地事务返回结果UNKNOW时,或者下面的回查方法也返回UNKNOW时 会触发回查
// 2.当上面操作超过20s没有做出一个结果,也就是超时或者卡主了,也会进行回查
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.err.println(new Date());
System.err.println(new String(msg.getBody()));
// 这里
return LocalTransactionState.UNKNOW;
}
});
producer.start();
Message message = new Message("TopicTest2", "我是一个事务消息".getBytes());
// 发送消息
producer.sendMessageInTransaction(message, null);
System.out.println(new Date());
System.in.read();
}

事务消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void testTransactionConsumer() throws Exception {
// 创建默认消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
// 设置nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息
consumer.subscribe("TopicTest2", "*");
// 注册一个消费监听 MessageListenerConcurrently是并发消费
// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 这里执行消费的代码 默认是多线程消费
System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}

RocketMQ发送带标签的消息,消息过滤

Rocketmq提供消息过滤功能,通过tag或者key进行区分

往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分,比如带有tagA标签的被A消费,带有tagB标签的被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤才区别

标签消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void testTagProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
Message msg = new Message("TopicTest","tagA", "我是一个带标记的消息".getBytes());
SendResult send = producer.send(msg);
System.out.println(send);
// 关闭实例
producer.shutdown();
}

标签消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Test
public void testTagConsumer() throws Exception {
// 创建默认消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
// 设置nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来消费 表达式,默认是*,支持"tagA || tagB || tagC" 这样或者的写法 只要是符合任何一个标签都可以消费
consumer.subscribe("TopicTest", "tagA || tagB || tagC");
// 注册一个消费监听 MessageListenerConcurrently是并发消费
// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 这里执行消费的代码 默认是多线程消费
System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
System.out.println(msgs.get(0).getTags());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}

什么时候该用 Topic,什么时候该用 Tag?

不同的业务应该使用不同的Topic如果是相同的业务里面有不同表的表现形式,那么要使用tag进行区分

可以从以下几个方面进行判断:

  1. 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分
  2. 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分
  3. 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分
  4. 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic

总的来说,针对消息分类,可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系

RocketMQ中消息的Key

在rocketmq中的消息,默认会有一个messageId当做消息的唯一标识,我们也可以给消息携带一个key,用作唯一标识或者业务标识,包括在控制面板查询的时候也可以使用messageId或者key来进行查询

带key消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void testKeyProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
Message msg = new Message("TopicTest","tagA","key", "我是一个带标记和key的消息".getBytes());
SendResult send = producer.send(msg);
System.out.println(send);
// 关闭实例
producer.shutdown();
}

带key消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Test
public void testKeyConsumer() throws Exception {
// 创建默认消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
// 设置nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来消费 表达式,默认是*,支持"tagA || tagB || tagC" 这样或者的写法 只要是符合任何一个标签都可以消费
consumer.subscribe("TopicTest", "tagA || tagB || tagC");
// 注册一个消费监听 MessageListenerConcurrently是并发消费
// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 这里执行消费的代码 默认是多线程消费
System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
System.out.println(msgs.get(0).getTags());
System.out.println(msgs.get(0).getKeys());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}

RocketMQ重试机制

生产者重试

失败的情况重发3次 producer.setRetryTimesWhenSendFailed(3);

消息在1s内没有发送成功,就会重试 producer.send(msg, 1000);

消费者重试

在消费者放return ConsumeConcurrentlyStatus.RECONSUME_LATER;后就会执行重试

实际生产过程中,一般重试3-5次,如果还没有消费成功,则可以把消息签收了,通知人工等处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 测试消费者
*
* @throws Exception
*/
@Test
public void testConsumer() throws Exception {
// 创建默认消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
// 设置nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息
consumer.subscribe("TopicTest", "*");
// 注册一个消费监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
// 这里执行消费的代码
System.out.println(Thread.currentThread().getName() + "----" + msgs);
// 这里制造一个错误
int i = 10 / 0;
} catch (Exception e) {
// 出现问题 判断重试的次数
MessageExt messageExt = msgs.get(0);
// 获取重试的次数 失败一次消息中的失败次数会累加一次
int reconsumeTimes = messageExt.getReconsumeTimes();
if (reconsumeTimes >= 3) {
// 则把消息确认了,可以将这条消息记录到日志或者数据库 通知人工处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}

RocketMQ死信消息

当消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列

当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。我们也可以去监听死信队列,然后进行自己的业务上的逻辑

消息生产者

1
2
3
4
5
6
7
8
9
@Test
public void testDeadMsgProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("dead-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message("dead-topic", "我是一个死信消息".getBytes());
producer.send(message);
producer.shutdown();
}

消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void testDeadMsgConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("dead-topic", "*");
// 设置最大消费重试次数 2 次
consumer.setMaxReconsumeTimes(2);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(msgs);
// 测试消费失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.in.read();
}

死信消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void testDeadMq() throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");
consumer.setNamesrvAddr("localhost:9876");
// 消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列
// 队列名称 默认是 %DLQ% + 消费者组名
consumer.subscribe("%DLQ%dead-group", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(msgs);
// 处理消息 签收了
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}

RocketMQ消息重复消费问题

为什么会出现重复消费问题?

BROADCASTING(广播)模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费,当然这个是根据需要来选择

CLUSTERING(负载均衡)模式下,如果一个topic被多个consumerGroup消费,也会重复消费

即使是在CLUSTERING模式下,同一个consumerGroup下,一个队列只会分配给一个消费者,看起来好像是不会重复消费。但是,有个特殊情况:一个消费者新上线后,同组的所有消费者要重新负载均衡(反之一个消费者掉线后,也一样)。一个队列所对应的新的消费者要获取之前消费的offset(偏移量,也就是消息消费的点位),此时之前的消费者可能已经消费了一条消息,但是并没有把offset提交给broker,那么新的消费者可能会重新消费一次。虽然orderly模式是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起concurrently要严格了,但是加锁的线程和提交offset的线程不是同一个,所以还是会出现极端情况下的重复消费

还有在发送批量消息的时候,会被当做一条消息进行处理,那么如果批量消息中有一条业务处理成功,其他失败了,还是会被重新消费一次

那么如果在CLUSTERING(负载均衡)模式下,并且在同一个消费者组中,不希望一条消息被重复消费,改怎么办呢?我们可以想到去重操作,找到消息唯一的标识,可以是msgid也可以是你自定义的唯一的key,这样就可以去重了

解决方案

使用去重方案解决,例如将消息的唯一标识存起来,然后每次消费之前先判断是否存在这个唯一标识,如果存在则不消费,如果不存在则消费,并且消费以后将这个标记保存

但是消息的体量是非常大的,可能在生产环境中会到达上千万甚至上亿条,那么我们该如何选择一个容器来保存所有消息的标识,并且又可以快速的判断是否存在呢?

我们可以选择布隆过滤器(BloomFilter)

布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难

在hutool的工具中我们可以直接使用https://hutool.cn/docs/#/bloomFilter/%E6%A6%82%E8%BF%B0**

测试生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void testRepeatProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
// 我们可以使用自定义key当做唯一标识
String keyId = UUID.randomUUID().toString();
System.out.println(keyId);
Message msg = new Message("TopicTest", "tagA", keyId, "我是一个测试消息".getBytes());
SendResult send = producer.send(msg);
System.out.println(send);
// 关闭实例
producer.shutdown();
}

添加hutool的依赖

1
2
3
4
5
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.11</version>
</dependency>

测试消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 在boot项目中可以使用@Bean在整个容器中放置一个单例对象
*/
public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100);

@Test
public void testRepeatConsumer() throws Exception {
// 创建默认消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setMessageModel(MessageModel.BROADCASTING);
// 设置nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来消费 表达式,默认是*
consumer.subscribe("TopicTest", "*");
// 注册一个消费监听 MessageListenerConcurrently是并发消费
// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 拿到消息的key
MessageExt messageExt = msgs.get(0);
String keys = messageExt.getKeys();
// 判断是否存在布隆过滤器中
if (bloomFilter.contains(keys)) {
// 直接返回了 不往下处理业务
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 这个处理业务,然后放入过滤器中
// do sth...
bloomFilter.add(keys);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}

Rocketmq集成SpringBoot

搭建rocketmq-producer(消息生产者)

创建项目,完整的pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>powernode</groupId>
<artifactId>01-rocketmq-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rocketmq-producer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rocketmq的依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>

修改配置文件application.yml

1
2
3
4
5
6
7
8
9
10
spring:
application:
name: rocketmq-producer
rocketmq:
name-server: 127.0.0.1:9876 # rocketMq的nameServer地址
producer:
group: powernode-group # 生产者组别
send-message-timeout: 3000 # 消息发送的超时时间
retry-times-when-send-async-failed: 2 # 异步消息发送失败重试次数
max-message-size: 4194304 # 消息的最大长度

在测试类里面测试发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 注入rocketMQTemplate,我们使用它来操作mq
*/
@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
* 测试发送简单的消息
*
* @throws Exception
*/
@Test
public void testSimpleMsg() throws Exception {
// 往powernode的主题里面发送一个简单的字符串消息
SendResult sendResult = rocketMQTemplate.syncSend("powernode", "我是一个简单的消息");
// 拿到消息的发送状态
System.out.println(sendResult.getSendStatus());
// 拿到消息的id
System.out.println(sendResult.getMsgId());
}

搭建rocketmq-consumer(消息消费者)

创建项目,完整的pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>powernode</groupId>
<artifactId>02-rocketmq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rocketmq-consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rocketmq的依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>

修改配置文件application.yml

1
2
3
4
5
spring:
application:
name: rocketmq-consumer
rocketmq:
name-server: 127.0.0.1:9876

添加一个监听的类SimpleMsgListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 创建一个简单消息的监听
* 1.类上添加注解@Component@RocketMQMessageListener
* @RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group")
* topic指定消费的主题,consumerGroup指定消费组,一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费
* 2.实现RocketMQListener接口,注意泛型的使用,可以为具体的类型,如果想拿到消息
* 的其他参数可以写成MessageExt
*/
@Component
@RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group",messageModel = MessageModel.CLUSTERING)
public class SimpleMsgListener implements RocketMQListener<String> {

/**
* 消费消息的方法
*
* @param message
*/
@Override
public void onMessage(String message) {
System.out.println(message);
}
}

启动rocketmq-consumer

RocketMQ发送对象消息和集合消息

发送对象消息

是监听的时候泛型中写对象的类型即可

修改rocketmq-producer添加一个Order类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 订单对象
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
/**
* 订单号
*/
private String orderId;

/**
* 订单名称
*/
private String orderName;

/**
* 订单价格
*/
private Double price;

/**
* 订单号创建时间
*/
private Date createTime;
/**
* 订单描述
*/
private String desc;

}

添加一个单元测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 测试发送对象消息
*
* @throws Exception
*/
@Test
public void testObjectMsg() throws Exception {
Order order = new Order();
order.setOrderId(UUID.randomUUID().toString());
order.setOrderName("我的订单");
order.setPrice(998D);
order.setCreateTime(new Date());
order.setDesc("加急配送");
// 往powernode-obj主题发送一个订单对象
rocketMQTemplate.syncSend("powernode-obj", order);
}

发送消息

修改rocketmq-consumer也添加一个Order类

修改rocketmq-consumer添加一个ObjMsgListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 创建一个对象消息的监听
* 1.类上添加注解@Component@RocketMQMessageListener
* 2.实现RocketMQListener接口,注意泛型的使用
*/
@Component
@RocketMQMessageListener(topic = "powernode-obj", consumerGroup = "powernode-obj-group")
public class ObjMsgListener implements RocketMQListener<Order> {

/**
* 消费消息的方法
*
* @param message
*/
@Override
public void onMessage(Order message) {
System.out.println(message);
}
}

重启rocketmq-consumer后查看控制台

发送集合消息

和对象消息同理,创建一个Order的集合,发送出去,监听方注意修改泛型中的类型为Object即可

RocketMQ集成SpringBoot发送不同消息模式

发送同步消息

理解为:消息由消费者发送到broker后,会得到一个确认,是具有可靠性的

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知等

底层调用syncSend,发送的是同步消息

发送异步消息

rocketMQTemplate.asyncSend()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 测试异步发送消息
*
* @throws Exception
*/
@Test
public void testAsyncSend() throws Exception {
// 发送异步消息,发送完以后会有一个异步通知
rocketMQTemplate.asyncSend("powernode", "发送一个异步消息", new SendCallback() {
/**
* 成功的回调
* @param sendResult
*/
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}

/**
* 失败的回调
* @param throwable
*/
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
// 测试一下异步的效果
System.out.println("谁先执行");
// 挂起jvm 不让方法结束
System.in.read();
}

发送单向消息

这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送

1
2
3
4
5
6
7
8
9
10
/**
* 测试单向消息
*
* @throws Exception
*/
@Test
public void testOnWay() throws Exception {
// 发送单向消息,没有返回值和结果
rocketMQTemplate.sendOneWay("powernode", "这是一个单向消息");
}

发送延迟消息

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 测试延迟消息
*
* @throws Exception
*/
@Test
public void testDelay() throws Exception {
// 构建消息对象
Message<String> message = MessageBuilder.withPayload("我是一个延迟消息").build();
// 发送一个延时消息,延迟等级为4级,也就是30s后被监听消费
SendResult sendResult = rocketMQTemplate.syncSend("powernode", message, 2000, 4);
System.out.println(sendResult.getSendStatus());
}

RocketMQ不支持任意时间的延时

只支持以下几个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

发送顺序消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* 订单的流程顺序
*/
private Integer seq;



/**
* 测试顺序消费
* mq会根据hash的值来存放到一个队列里面去
*
* @throws Exception
*/
@Test
public void testOrderly() throws Exception {
List<Order> orders = Arrays.asList(
new Order(UUID.randomUUID().toString().substring(0, 5), "张三的下订单", null, null, null, 1),
new Order(UUID.randomUUID().toString().substring(0, 5), "张三的发短信", null, null, null, 1),
new Order(UUID.randomUUID().toString().substring(0, 5), "张三的物流", null, null, null, 1),
new Order(UUID.randomUUID().toString().substring(0, 5), "张三的签收", null, null, null, 1),

new Order(UUID.randomUUID().toString().substring(0, 5), "李四的下订单", null, null, null, 2),
new Order(UUID.randomUUID().toString().substring(0, 5), "李四的发短信", null, null, null, 2),
new Order(UUID.randomUUID().toString().substring(0, 5), "李四的物流", null, null, null, 2),
new Order(UUID.randomUUID().toString().substring(0, 5), "李四的签收", null, null, null, 2)
);
// 我们控制流程为 下订单->发短信->物流->签收 hash的值为seq,也就是说 seq相同的会放在同一个队列里面,顺序消费
orders.forEach(order -> {
rocketMQTemplate.syncSendOrderly("powernode-obj", order, String.valueOf(order.getSeq()));
});
}


/**
* 创建一个对象消息的监听
* 1.类上添加注解@Component@RocketMQMessageListener
* 2.实现RocketMQListener接口,注意泛型的使用
* consumeMode 指定消费类型
* CONCURRENTLY 并发消费
* ORDERLY 顺序消费 messages orderly. one queue, one thread
*/
@Component
@RocketMQMessageListener(topic = "powernode-obj",
consumerGroup = "powernode-obj-group",
consumeMode = ConsumeMode.ORDERLY
)
public class ObjMsgListener implements RocketMQListener<Order> {

/**
* 消费消息的方法
*
* @param message
*/
@Override
public void onMessage(Order message) {
System.out.println(message);
}
}

发送事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 测试事务消息
* 默认是sync(同步的)
* 事务消息会有确认和回查机制
* 事务消息都会走到同一个监听回调里面,所以我们需要使用tag或者key来区分过滤
*
* @throws Exception
*/
@Test
public void testTrans() throws Exception {
// 构建消息体
Message<String> message = MessageBuilder.withPayload("这是一个事务消息").build();
// 发送事务消息(同步的) 最后一个参数才是消息主题
TransactionSendResult transaction = rocketMQTemplate.sendMessageInTransaction("powernode", message, "消息的参数");
// 拿到本地事务状态
System.out.println(transaction.getLocalTransactionState());
// 挂起jvm,因为事务的回查需要一些时间
System.in.read();
}





// 添加一个本地事务消息的监听(半消息)
/**
* 事务消息的监听与回查
* 类上添加注解@RocketMQTransactionListener 表示这个类是本地事务消息的监听类
* 实现RocketMQLocalTransactionListener接口
* 两个方法为执行本地事务,与回查本地事务
*/
@Component
@RocketMQTransactionListener(corePoolSize = 4,maximumPoolSize = 8)
public class TmMsgListener implements RocketMQLocalTransactionListener {

/**
* 执行本地事务,这里可以执行一些业务
* 比如操作数据库,操作成功就return RocketMQLocalTransactionState.COMMIT;
* 可以使用try catch来控制成功或者失败;
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 拿到消息参数
System.out.println(arg);
// 拿到消息头
System.out.println(msg.getHeaders());
// 返回状态COMMIT,UNKNOWN
return RocketMQLocalTransactionState.UNKNOWN;
}

/**
* 回查本地事务,只有上面的执行方法返回UNKNOWN时,才执行下面的方法 默认是1min回查
* 此方法为回查方法,执行需要等待一会
* xxx.isSuccess() 这里可以执行一些检查的方法
* 如果返回COMMIT,那么本地事务就算是提交成功了,消息就会被消费者看到
*
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println(msg);
return RocketMQLocalTransactionState.COMMIT;
}
}
  1. 消息会先到事务监听类的执行方法,
  2. 如果返回状态为COMMIT,则消费者可以直接监听到
  3. 如果返回状态为ROLLBACK,则消息发送失败,直接回滚
  4. 如果返回状态为UNKNOW,则过一会会走回查方法
  5. 如果回查方法返回状态为UNKNOW或者ROLLBACK,则消息发送失败,直接回滚
  6. 如果回查方法返回状态为COMMIT,则消费者可以直接监听到

RocketMQ集成SpringBoot的消息过滤

tag过滤(常在消费者端过滤)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 发送一个带tag的消息
*
* @throws Exception
*/
@Test
public void testTagMsg() throws Exception {
// 发送一个tag为java的数据
rocketMQTemplate.syncSend("powernode-tag:java", "我是一个带tag的消息");
}


/**
* 创建一个简单的标签消息的监听
* 1.类上添加注解@Component@RocketMQMessageListener
* selectorType = SelectorType.TAG, 指定使用tag过滤。(也可以使用sql92 需要在配置文件broker.conf中开启enbalePropertyFilter=true)
* selectorExpression = "java" 表达式,默认是*,支持"tag1 || tag2 || tag3"
* 2.实现RocketMQListener接口,注意泛型的使用
*/
@Component
@RocketMQMessageListener(topic = "powernode-tag",
consumerGroup = "powernode-tag-group",
selectorType = SelectorType.TAG,
selectorExpression = "java"
)
public class TagMsgListener implements RocketMQListener<String> {

/**
* 消费消息的方法
*
* @param message
*/
@Override
public void onMessage(String message) {
System.out.println(message);
}
}

key过滤(可以在事务监听的类里面区分)

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 发送一个带key的消息,我们使用事务消息 打断点查看消息头
*
* @throws Exception
*/
@Test
public void testKeyMsg() throws Exception {
// 发送一个key为spring的事务消息
Message<String> message = MessageBuilder.withPayload("我是一个带key的消息")
.setHeader(RocketMQHeaders.KEYS, "spring")
.build();
rocketMQTemplate.sendMessageInTransaction("powernode", message, "我是一个带key的消息");
}

RocketMQ集成SpringBoot消息消费两种模式

Rocketmq消息消费的模式分为两种:负载均衡模式和广播模式

负载均衡模式表示多个消费者交替消费同一个主题里面的消息

广播模式表示每个每个消费者都消费一遍订阅的主题的消息

rocketmq-consumer-b添加一个监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* messageModel 指定消息消费的模式
* CLUSTERING 为负载均衡模式
* BROADCASTING 为广播模式
*/
@Component
@RocketMQMessageListener(topic = "powernode",
consumerGroup = "powernode-group",
messageModel = MessageModel.CLUSTERING
)
public class ConsumerBListener implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
System.out.println(message);
}
}

修改rocketmq-consumer的SimpleMsgListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 创建一个简单消息的监听
* 1.类上添加注解@Component@RocketMQMessageListener
*
* @RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group")
* topic指定消费的主题,consumerGroup指定消费组,一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费
* 2.实现RocketMQListener接口,注意泛型的使用
*/
@Component
@RocketMQMessageListener(topic = "powernode",
consumerGroup = "powernode-group",
messageModel = MessageModel.CLUSTERING)
public class SimpleMsgListener implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
System.out.println(new Date());
System.out.println(message);
}
}

启动两个消费者

在生产者里面添加一个单元测试

1
2
3
4
5
6
7
8
9
10
11
/**
* 测试消息消费的模式
*
* @throws Exception
*/
@Test
public void testMsgModel() throws Exception {
for (int i = 0; i < 10; i++) {
rocketMQTemplate.syncSend("powernode", "我是消息" + i);
}
}

修改两个消费者的模式为BROADCASTING

重启测试,结果是广播模式,每个消费者都消费了这些消息

重置消费点位:将一个组的消费节点 设置在之前的某一个时间点上去 从这个时间点开始往后消费

跳过堆积:选择一个组 跳过堆积以后 这个组里面的的所有都不会被消费了

如何解决消息堆积问题

一般认为单条队列消息差值>=10w时 算堆积问题

什么情况下会出现堆积

  1. 生产太快了

    生产方可以做业务限流

    增加消费者数量,但是消费者数量<=队列数量,适当的设置最大的消费线程数量(根据IO(2n)/CPU(n+1))

    动态扩容队列数量,从而增加消费者数量

  2. 消费者消费出现问题

    排查消费者程序的问题

如何确保消息不丢失

  1. 生产者使用同步发送模式 ,收到mq的返回确认以后 顺便往自己的数据库里面写 msgId status(0) time
  2. 消费者消费以后 修改数据这条消息的状态 = 1
  3. 写一个定时任务 间隔两天去查询数据 如果有status = 0 and time < day-2
  4. 将mq的刷盘机制设置为同步刷盘
  5. 使用集群模式 ,搞主备模式,将消息持久化在不同的硬件上
  6. 可以开启mq的trace机制,消息跟踪机制
  • 在broker.conf中开启消息追踪 traceTopicEnable=true

    重启broker即可

  • 生产者配置文件开启消息轨迹enable-msg-trace: true

    消费者开启消息轨迹功能,可以给单独的某一个消费者开启enableMsgTrace = true

    在rocketmq的面板中可以查看消息轨迹

    默认会将消息轨迹的数据存在 RMQ_SYS_TRACE_TOPIC 主题里面

安全

  1. 开启acl的控制 在broker.conf中开启aclEnable=true

  2. 配置账号密码 修改plain_acl.yml

  3. 修改控制面板的配置文件 放开52/53行 把49行改为true

    上传到服务器的jar包平级目录下即可

秒杀

技术选择型

  • Springboot 接收请求并操作redis和mysql
  • Redis 用于缓存+分布式锁(setnx+自旋),也可以使用lua脚本(原子性)
  • Rocketmq 用于解耦 削峰,异步
  • Mysql 用于存放真实的商品信息
  • Mybatis 用于操作数据库的orm框架

架构图

准备工作-数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for goods
-- ----------------------------
DROP TABLE IF EXISTS `goods`;
CREATE TABLE `goods` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`goods_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL,
`price` decimal(10, 2) NULL DEFAULT NULL,
`stocks` int(255) NULL DEFAULT NULL,
`status` int(255) NULL DEFAULT NULL,
`pic` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL,
`create_time` datetime(0) NULL DEFAULT NULL,
`update_time` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of goods
-- ----------------------------
INSERT INTO `goods` VALUES (1, '小米12s', 4999.00, 1000, 2, 'xxxxxx', '2023-02-23 11:35:56', '2023-02-23 16:53:34');
INSERT INTO `goods` VALUES (2, '华为mate50', 6999.00, 10, 2, 'xxxx', '2023-02-23 11:35:56', '2023-02-23 11:35:56');
INSERT INTO `goods` VALUES (3, '锤子pro2', 1999.00, 100, 1, NULL, '2023-02-23 11:35:56', '2023-02-23 11:35:56');

-- ----------------------------
-- Table structure for order_records
-- ----------------------------
DROP TABLE IF EXISTS `order_records`;
CREATE TABLE `order_records` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) NULL DEFAULT NULL,
`order_sn` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL,
`goods_id` int(11) NULL DEFAULT NULL,
`create_time` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

创建项目选择依赖spike-web(接受用户秒杀请求)

Pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.13</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.powernode</groupId>
<artifactId>spike-web</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spike-web</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<!-- rocketmq的依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.14</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>

修改配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
server:
port: 7001
spring:
application:
name: spike-web
redis:
host: 127.0.0.1
port: 6379
database: 0
lettuce:
pool:
enabled: true
max-active: 100
max-idle: 20
min-idle: 5
rocketmq:
name-server: 192.168.188.129:9876 # rocketMq的nameServer地址
producer:
group: powernode-group # 生产者组别
send-message-timeout: 3000 # 消息发送的超时时间
retry-times-when-send-async-failed: 2 # 异步消息发送失败重试次数
max-message-size: 4194304 # 消息的最大长度

创建SpikeController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@RestController
public class SeckillController {


@Autowired
private StringRedisTemplate redisTemplate;

@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
* 压测时自动是生成用户id
*/
AtomicInteger ai = new AtomicInteger(0);

/**
* 1.一个用户针对一种商品只能抢购一次
* 2.做库存的预扣减 拦截掉大量无效请求
* 3.放入mq 异步化处理订单
*
* @return
*/
@GetMapping("doSeckill")
public String doSeckill(Integer goodsId /*, Integer userId*/) {
int userId = ai.incrementAndGet();
// unique key 唯一标记 去重
String uk = userId + "-" + goodsId;
// set nx set if not exist
Boolean flag = redisTemplate.opsForValue().setIfAbsent("seckillUk:" + uk, "");
if (!flag) {
return "您以及参与过该商品的抢购,请参与其他商品抢购!";
}
// 假设库存已经同步了 key:goods_stock:1 val:10
Long count = redisTemplate.opsForValue().decrement("goods_stock:" + goodsId);
// getkey java setkey 先查再写 再更新 有并发安全问题
if (count < 0) {
return "该商品已经被抢完,请下次早点来哦O(∩_∩)O";
}
// 放入mq
HashMap<String, Integer> map = new HashMap<>(4);
map.put("goodsId", goodsId);
map.put("userId", userId);
rocketMQTemplate.asyncSend("seckillTopic3", JSON.toJSONString(map), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功" + sendResult.getSendStatus());
}

@Override
public void onException(Throwable throwable) {
System.err.println("发送失败" + throwable);
}
});
return "拼命抢购中,请稍后去订单中心查看";
}

}

创建项目选择依赖spike-service(处理秒杀)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.13</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.powernode</groupId>
<artifactId>spike-service</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spike-service</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>

<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.6</version>
</dependency>
<!-- rocketmq的依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.14</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>

修改yml文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
server:
port: 7002
spring:
application:
name: spike-service
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/spike?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
username: root
password: 123456
type: com.alibaba.druid.pool.DruidDataSource
redis:
host: 127.0.0.1
port: 6379
database: 0
lettuce:
pool:
enabled: true
max-active: 100
max-idle: 20
min-idle: 5
mybatis:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
mapper-locations: classpath*:mapper/*.xml
rocketmq:
name-server: 192.168.188.129:9876

逆向生成实体类等

修改启动类

1
2
3
4
5
6
7
8
@SpringBootApplication
@MapperScan(basePackages = {"com.powernode.mapper"})
public class SpikeServiceApplication {

public static void main(String[] args) {
SpringApplication.run(SpikeServiceApplication.class, args);
}
}

修改GoodsMapper

1
List<Goods> selectSeckillGoods();

修改GoodsMapper.xml

1
2
3
4
5
<!--  查询数据库中需要参于秒杀的商品数据 status = 2 -->
<select id="selectSeckillGoods" resultMap="BaseResultMap">
select `id`,`stocks` from goods where `status` = 2
</select>

同步mysql数据到redis

方法一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 将mysql的参与抢购的商品的数据
* 同步到redis里面去
* 在上游服务需要使用redis来做库存的预扣减
*/
@Component
public class DataSyncConfig {

@Autowired
private GoodsMapper goodsMapper;

@Autowired
private StringRedisTemplate redisTemplate;

// 业务场景是搞一个定时任务 每天10点开启
// 为了 测试方便 项目已启动就执行一次

/**
* spring bean的生命周期
* 在当前对象 实例化完以后
* 属性注入以后
* 执行 PostConstruct 注解的方法
*/
@PostConstruct
@Scheduled(cron = "0 10 0 0 0 ?")
public void initData() {
List<Goods> goodsList = goodsMapper.selectSeckillGoods();
if (CollectionUtils.isEmpty(goodsList)) {
return;
}
goodsList.forEach(goods -> redisTemplate.opsForValue().set("goods_stock:" + goods.getId(), goods.getStocks().toString()));
}
}

方法二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Component
public class MySqlToRedis2 implements CommandLineRunner {


@Resource
private GoodsMapper goodsMapper;

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public void run(String... args) throws Exception {
initData();
}

private void initData() {
//1,查询数据库中需要参于秒杀的商品数据
List<Goods> goodsList = goodsMapper.querySpikeGoods();
ValueOperations<String, String> operations = stringRedisTemplate.opsForValue();
// //2,把数据同步到Redis
for (Goods goods : goodsList) {
operations.set("goods:" + goods.getGoodsId(), goods.getTotalStocks().toString());
}
}
}

创建秒杀监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* 时间 14:27
* 默认负载均衡模式
* 默认多线程消费
*/
@Component
@RocketMQMessageListener(topic = "seckillTopic3", consumerGroup = "seckill-consumer-group")
public class SeckillMsgListener implements RocketMQListener<MessageExt> {

@Autowired
private GoodsService goodsService;

@Autowired
private StringRedisTemplate redisTemplate;

// 20s
int time = 20000;

@Override
public void onMessage(MessageExt message) {
String s = new String(message.getBody());
JSONObject jsonObject = JSON.parseObject(s);
Integer goodsId = jsonObject.getInteger("goodsId");
Integer userId = jsonObject.getInteger("userId");
// 做真实的抢购业务 减库存 写订单表 todo 答案2 但是不符合分布式
// synchronized (SeckillMsgListener.class) {
// goodsService.realDoSeckill(goodsId, userId);
// }
// 自旋锁 一般 mysql 每秒1500/s写 看数量 合理的设置自旋时间 todo 答案3
int current = 0;
while (current <= time) {
// 一般在做分布式锁的情况下 会给锁一个过期时间 防止出现死锁的问题
// 也可以使用redisson(锁自动过期,锁超时,公平锁,可重入锁)
Boolean flag = redisTemplate.opsForValue().setIfAbsent("goods_lock:" + goodsId, "", 10, TimeUnit.SECONDS);
if (flag) {
try {
goodsService.realDoSeckill(goodsId, userId);
return;
} finally {
redisTemplate.delete("goods_lock:" + goodsId);
}
} else {
current += 200;
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

修改GoodsService

1
void realDoSeckill(Integer goodsId, Integer userId);

修改GoodsServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Resource
private GoodsMapper goodsMapper;

@Autowired
private OrderRecordsMapper orderRecordsMapper;

/**
*
* @param goodsId
* @param userId
*/
@Override
@Transactional(rollbackFor = RuntimeException.class)
public void realDoSeckill(Integer goodsId, Integer userId) {
// 扣减库存 插入订单表
Goods goods = goodsMapper.selectByPrimaryKey(goodsId);
int finalStock = goods.getStocks() - 1;
if (finalStock < 0) {
// 只是记录日志 让代码停下来 这里的异常用户无法感知
throw new RuntimeException("库存不足:" + goodsId);
}
goods.setStocks(finalStock);
goods.setUpdateTime(new Date());
// insert 要么成功 要么报错 update 会出现i<=0的情况
// update goods set stocks = 1 where id = 1 没有行锁
int i = goodsMapper.updateByPrimaryKey(goods);
if (i > 0) {
// 写订单表
OrderRecords orderRecords = new OrderRecords();
orderRecords.setGoodsId(goodsId);
orderRecords.setUserId(userId);
orderRecords.setCreateTime(new Date());
// 时间戳生成订单号
orderRecords.setOrderSn(String.valueOf(System.currentTimeMillis()));
orderRecordsMapper.insert(orderRecords);
}
}

/**
* mysql行锁 innodb 行锁
* 分布式锁
* todo 答案1
*
* @param goodsId
* @param userId
*/
// @Override
// @Transactional(rollbackFor = RuntimeException.class)
// public void realDoSeckill(Integer goodsId, Integer userId) {
// // update goods set stocks = stocks - 1 ,update_time = now() where id = #{value}
// int i = goodsMapper.updateStocks(goodsId);
// if (i > 0) {
// // 写订单表
// OrderRecords orderRecords = new OrderRecords();
// orderRecords.setGoodsId(goodsId);
// orderRecords.setUserId(userId);
// orderRecords.setCreateTime(new Date());
// // 时间戳生成订单号
// orderRecords.setOrderSn(String.valueOf(System.currentTimeMillis()));
// orderRecordsMapper.insert(orderRecords);
// }
// }