消息队列

什么是消息队列

消息队列(Message Queue),即MQ,是一种在分布式系统中用于在不同组件或服务之间传递消息的通信模式。多个进程可同时向一个消息队列发送消息,也可以同时从一个消息队列中接收消息。发送进程把消息发送到队列尾部,接受进程从消息队列头部读取消息,消息一旦被读出就从队列中删除。

为什么要使用消息队列

解耦

解耦,即降低耦合度(通俗一点讲就是降低系统之间的依赖程度)。

这里引入一个博主讲的例子【系统A是一个关键性的系统,产生数据后需要通知到系统B和系统C做响应的反应,三个系统都写好了,稳定运行;某一天,系统D也需要在系统A产生数据后作出反应,那就得系统A改代码,去调系统D的接口,好,改完了,上线了。假设过了某段时间,系统C因为某些原因,不需要作出反应了,不要系统A调它接口了,就让系统A把调接口的代码删了,系统A的负责人肯定会很烦,改来改去,不停地改,不同地测,还得看会不会影响系统B,系统D。没办法,这种架构下,就是这样麻烦。而且这样还没考虑异常情况,假如系统A产生了数据,本来需要实时调系统B的,结果系统B宕机了或重启了,没调成功咋办,或者调用返回失败怎么办,系统A是不是要考虑要不要重试?还要开发一套重试机制,系统A要考虑的东西也太多了吧。】

使用MQ后,系统A只需要将数据写入MQ中,而不需要关心系统B,C,D的消费情况。这样各系统之间的依赖程度大大降低了。

异步

异步?同步?就是需要等对方完成后自己才能进行下一步的叫同步,互不干扰一起进行的叫异步。没有使用MQ时,系统A需要向系统B,C,D发送数据,这个事件才算完成。这其中耗费的事件会降低用户的使用体验。而在用了MQ后,系统A只需要将数据写入MQ后,就可以响应了,大大节省了时间。

削峰

举个例子,双十一大促的时候,订单峰值达到了54.4万笔/秒,如果不加限制,数据库基本会崩溃,随之到来的是系统直接异常。而在程序员修复的这段时间,每秒会流失多少订单,你可以想象一下。使用MQ后,用户下单后系统A只管往MQ中写数据,而系统B,C,D等等可以设置消费的频率,不管一下有多少订单,我只按照我自己的节奏处理数据。这样就很有效的保护了系统,也提高了吞吐量。

几个常见的消息队列对比

  • Pulsar:是下一代云原生分布式消息流平台,集消息,存储,轻量化函数式计算为一体,采用存算分离的架构设计
  • BMQ:和Pulsar架构类似,存算分离,初期定位是承接高吞吐的离线业务场景,逐步替换掉对应的kafka集群

kafka

主要应用

搜索服务,直播服务,订单服务,支付服务

基本概念

  • Producer:即生产者,消息的入口。
  • Consumer:即消费者,负责消费 Topic 中的消息
  • ConsumerGroup:消费者组,不同组 Consumer 消费进度互不干涉
  • Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,每个kafka集群内的broker都有一个不重复的编号,如broker-0、broker-1等等
  • Topic:消息的主题,kafka的数据就保存在topic。在每个broker上可以创建多个topic。
  • Partition:topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹

每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息

  • Replication:每一个分区都有多个副本。当Leader故障的时候会选择一个Follower,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,Follower和Leader是在不同的机器上的,同一机器对同一个分区也只可能存放一个副本。

  • Offset:消息在 partition 内的相对位置信息,可以理解为唯一 ID,在 partition 内部严格递增

  • Coordinator:协调不同组件之间工作的实体

    • Group Coordinator(消费者组协调器):负责管理消费者组(Consumer Group)的协调工作。它负责维护消费者组的成员信息、分区分配以及处理消费者的加入和离开等操作。消费者在消费消息时需要与 Group Coordinator 进行交互
    • Transaction Coordinator(事务协调器):负责管理事务性生产者(Transactional Producer)的事务。它协调生产者端发送消息的事务性操作,确保消息的原子性和一致性。Transaction Coordinator 还负责将事务性消息的提交和回滚操作同步到所有相关的分区
  • Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。

提高性能

  • Producer:批量发送、数据压缩
  • Broker:顺序写,消息索引(可以使用二分查找),零拷贝
  • Consumer:Reblance

Rebalance(重新平衡)是指在消费者组(Consumer Group)中新增或移除消费者时,Kafka 会重新分配分区(Partitions)给消费者,以确保每个消费者负责处理的分区数尽可能均衡。当消费者加入消费者组或者从消费者组中移除时,Kafka 会触发一次 Rebalance 操作。在 Rebalance 过程中,Kafka 将重新分配各个分区给不同的消费者,以确保每个消费者处理的分区数量大致相等,从而实现负载均衡和提高整体性能。通过重新平衡,Kafka 可以有效地调整消费者之间的负载,避免某些消费者处理过多的分区,而其他消费者却处于空闲状态。这有助于提高整个消费者组的处理能力,并确保消息能够被及时消费。

问题

  • 运维成本高

Kafka 在部署和维护过程中需要一定的专业知识和经验,包括配置管理、性能优化、监控和故障排除等方面的工作。此外,Kafka 集群通常由多个节点组成,需要进行规划和协调,因此运维成本相对较高

  • 对于负载不均衡的场景,解决方案复杂

当 Kafka 集群中出现负载不均衡的情况时,需要重新分配分区以实现负载均衡。这可能涉及到重新配置消费者组或者调整分区分配策略,而这些操作相对复杂且需要谨慎地进行规划和执行

  • 没有自己的缓存,完全依赖 Page cache

Kafka 的设计中并没有包含专门的缓存层,而是完全依赖操作系统的文件系统和磁盘缓存(Page Cache)。这意味着 Kafka 不能直接控制缓存的管理和使用,而是受限于操作系统的缓存机制

  • Controller 和 Coordinator 和 Broker 在同一进程中,大量 IO 会造成其性能下降

Kafka 中的 Controller、Coordinator 和 Broker 是在同一个进程中运行的,当遇到大量的 IO 操作时,可能会造成性能下降。这是因为它们共享了同一进程的资源,可能会相互影响,导致性能问题

下载

注意以下命令请在linux中使用,以Ubuntu为例子

校验java环境是否已安装

1
2
3
4
5
java -version
//如果未安装
sudo apt-get update
sudo apt-get install default-jre
java -version

安装zooKeeper

Apache ZooKeeper

1
2
3
4
5
tar -zxvf ./apache-zookeeper-3.9.1-bin.tar.gz -C ./opt/module/
mv zoo_sample.cfg zoo.cfg //在conf目录下
vim zoo.cfg
//将dataDir更换为 dataDir=/opt/module/apache-zookeeper-3.5.7-bin/zkData zkData自行创建
mkdir zkData

下载Kafka

Apache Kafka

1
2
3
4
5
6
tar -zxvf ./kafka_2.13-3.7.0.tgz -C ./opt/module/
cd /opt/module/kafka_2.13-3.7.0/config/
vim server.properties
//将listeners修改 listeners=PLAINTEXT://LOCALHOST:9092
//将log.dirs修改 log.dirs=/opt/module/kafka_2.13-3.7.0/log.dirs
mkdir log.dirs

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//注意需进入对应的目录
//启动zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties //查看
jps
//启动kafka
bin/kafka-server-start.sh config/server.properties
//查看
jps
//查看所有主题
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
//创建test
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 --topic test
//删除test
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test
//启动生产者
bin/kafka-console-producer.sh --broker-list --bootstrap-server localhost:9092 --topic test
//启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
//注意生产者和消费者建议开两个窗口,实时输出消费,这样更直观
//结束

Golang中如何正确的使用sarama包操作Kafka

在实际场景中可能会出现“消息丢失”,重复消费,乱序等问题,本篇不详细说明,具体可到这篇博客学习传送门,下面给出示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
//开始之前
go get -u github.com/IBM/sarama
//consumer.go
package main

import (
"fmt"
"log"
"github.com/IBM/sarama"
)

func main() {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true

consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalf("Failed to start consumer: %v", err)
}
defer consumer.Close()

partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
if err != nil {
log.Fatalf("Failed to start partition consumer: %v", err)
}
defer partitionConsumer.Close()

for {
select {
case msg := <-partitionConsumer.Messages():
fmt.Printf("Received message: %s\n", string(msg.Value))
case err := <-partitionConsumer.Errors():
log.Printf("Error: %v\n", err)
}
}
}
//producer.go
//同步
package main

import (
"fmt"
"github.com/IBM/sarama"
)

func main() {
config := sarama.NewConfig()
// 1. 初始化生产者配置
config.Producer.RequiredAcks = sarama.WaitForAll
// 选择分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 成功交付的信息
config.Producer.Return.Successes = true

// 2. 构造一个消息,结构体类型
msg := &sarama.ProducerMessage{
Topic: "test",
Value: sarama.StringEncoder("hello world"),
}

// 3. 连接kafka
client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Println(err)
}
defer client.Close()
// 4. 发送消息
partition, offset, err := client.SendMessage(msg)
fmt.Println("partition:")
fmt.Println(partition)
fmt.Println("offset:")
fmt.Println(offset)
if err != nil {
fmt.Println(err)
}
}
//异步
package main

import (
"fmt"
"github.com/IBM/sarama"
)

func main() {
config := sarama.NewConfig()
// 1. 初始化生产者配置
config.Producer.RequiredAcks = sarama.WaitForAll
// 选择分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 成功交付的信息
config.Producer.Return.Successes = true

// 2. 构造一个消息,结构体类型
msg := &sarama.ProducerMessage{
Topic: "test",
Value: sarama.StringEncoder("hello_jh"),
}

// 3. 连接kafka
client, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Println(err)
}
defer client.Close()
// 4. 发送消息
client.Input()<-msg

select {
case success := <-client.Successes():
log.Printf("Message sent successfully: %v\n", success)
case err := <-client.Errors():
log.Fatalf("Error sending message: %v\n", err)
}
}
//同步发送消息需要等待发送结果,因此需要通过返回值(如 SendMessage 的返回值)来获取发送结 //果,以确保消息发送成功。而异步发送消息不会阻塞当前线程,可以通过 channel 监听发送成功或失
//败的回调来处理结果。

Kafka入门(3):Sarama生产者是如何工作的