文章目录
  1. 1. 基本概念
    1. 1.1. 关于顺序和分区
    2. 1.2. 关于副本
    3. 1.3. 关于丢消息和重复收到消息
  2. 2. Kafka 的工具和编程接口
    1. 2.1. Kafka 的工具
    2. 2.2. Kafka 的 Java API
  3. 3. Kafka 副本和集群
  4. 4. Kafka 最佳实践
  5. 5. 后记

现代的互联网分布式系统,只要稍微大一些,就一定逃不开3类中间件:远程调用(RPC)框架、消息队列、数据库访问中间件。Kafka 是消息队列中间件的代表产品,用 Scala 语言实现,本文采用的是 Kafka_2.11 0.10.0.0 版本进行实验。

基本概念

首先,Kafka 中有一些基本的概念需要熟悉 1 2

  • Topic,指消息的类别,每个消息都必须有;
  • Producer,指消息的产生者,或者,消息的写端;
  • Consumer,指消息的消费者,或者,消息的读端;
  • Producer Group,指产生者组,组内的生产者产生同一类消息;
  • Consumer Group,指消费者组,组内的消费者消费同一类消息;
  • Broker,指消息服务器,Producer 产生的消息都是写到这里,Consumer 读消息也是从这里读;
  • Zookeeper,是 Kafka 的注册中心,Broker 和 Consumer 之间的协调器,包含状态信息、配置信息和一些 Topic 的信息;
  • Partition,指消息的水平分区,一个 Topic 可以有多个分区;
  • Replica,指消息的副本,为了提高可用性,将消息副本保存在其他 Broker 上;

特别说明,Broker 是指单个消息服务进程,一般情况下,Kafka 是集群运行的,Broker 只是集群中的一个服务进程,而非代指整个 Kafka 服务,可以简单将 Broker 理解成服务器(Server)。Kafka 引入的术语都比较常见,从字面上理解相对直观。Kafka 的大致结构图是这样,

Kafka 是 Pull 模式的消息队列,即 Consumer 连到消息队列服务上,主动请求新消息,如果要做到实时性,需要采用长轮询,Kafka 在0.8的时候已经支持长轮询模式。上图中 Consumer 的连接箭头方向可能会让读者误以为是 Push 模式,特此注明。更多关于 Kafka 设计的文章可以参考官方文档,或者一些比较好的博客文章 3

关于顺序和分区

Kafka 是一个力求保持消息顺序性的消息队列,但不是完全保证,其保证的是 Partition 级别的顺序性,如下图,

此图是 Topic 的分区 log 的示意图,可见,每个分区上的 log 都是一个有序的队列,所以,Kafka 是分区级别有序的。如果,某个 Topic 只有一个分区,那么这个 Topic 下的消息就都是有序的。

分区是为了提升消息处理的吞吐率而产生的,将一个 Topic 中的消息分成几份,分别给不同的 Broker 处理。如下图,

此图中有2个 Broker,Server 1 和 Server 2,每个 Broker 上有2个分区,总共4个分区,P0 ~ P3;有2个 Consumer Group,Consumer Group A 有2个 Consumer,Consumer Group B 有4个 Consumer。Kafka 的实现是,在稳定的情况下,维持固定的连接,每个 Consumer 稳定的消费其中某几个分区的消息,以上图举例,Consumer Group A 中的 C1 稳定消费 P0、P3,C2 稳定消费 P1、P2。这样的连接分配可能会导致消息消费的不均匀分布,但好处是比较容易保证顺序性。

维持完全的顺序性在分布式系统看来几乎是无意义的。因为,如果需要维持顺序性,那么就只能有一条线程阻塞的处理顺序消息,即,Producer -> MQ -> Consumer 必须线程上一一对应。这与分布式系统的初衷是相违背的。但是局部的有序性,是可以维持的。比如,有30000条消息,每3条之间有关联,1->2->3,4->5->6,……,但是全局范围来看,并不需要保证 1->4->7,可以 7->4->1 的顺序来执行,这样可以达到最大并行度10000,而这通常是现实中我们面对的情况。通常应用中,将有先后关系的消息发送到相同的分区上,即可解决大部分问题。

关于副本

副本是高可用 Kafka 集群的实现方式。假设集群中有3个 Broker,那么可以指定3个副本,这3个副本是对等的,对于某个 Topic 的分区来说,其中一个是 Leader,即主节点,另外2个副本是 Follower,即从节点,每个副本在一个 Broker 上。当 Leader 收到消息的时候,会将消息写一份到副本中,通常情况,只有 Leader 处于工作状态。在 Leader 发生故障宕机的时候,Follwer 会取代 Leader 继续传送消息,而不会发生消息丢失。Kafka 的副本是以分区为单位的,也就是说,即使是同一个 Topic,其不同分区的 Leader 节点也不同。甚至,Kafka 倾向于用不同的 Broker 来做分区的 Leader,因为这样能做到更好的负载均衡。

在副本间的消息同步,实际上是复制消息的 log,复制可以是同步复制,也可以是异步复制。同步复制是说,当 Leader 收到消息后,将消息写入从副本,只有在收到从副本写入成功的确认后才返回成功给 Producer;异步复制是说,Leader 将消息写入从副本,但是不等待从副本的成功确认,直接返回成功给 Producer。同步复制效率较低,但是消息不会丢;异步复制效率高,但是在 Broker 宕机的时候,可能会出现消息丢失。

关于丢消息和重复收到消息

任何一个 MQ 都需要处理丢消息和重复收到消息的,正常情况下,Kafka 可以保证:1. 不丢消息;2. 不重复发消息;3. 消息读且只读一次。当然这都是正常情况,极端情况,如 Broker 宕机,断电,这类情况下,Kafka 只能保证 1 或者 2,无法保证 3。

在有副本的情况下,Kafka 是可以保证消息不丢的,其前提是设置了同步复制,这也是 Kafka 的默认设置,但是可能出现重复发送消息,这个交给上层应用解决;在生产者中使用异步提交,可以保证不重复发送消息,但是有丢消息的可能,如果应用可以容忍,也可以接受。如果需要实现读且只读一次,就比较麻烦,需要更底层的 API 4

Kafka 的工具和编程接口

Kafka 的工具

Kafka 提供的工具还是比较全的,bin/ 目录下的工具有以下一些,

1
2
3
4
5
6
7
bin/connect-distributed.sh bin/kafka-consumer-offset-checker.sh bin/kafka-replica-verification.sh bin/kafka-verifiable-producer.sh
bin/connect-standalone.sh bin/kafka-consumer-perf-test.sh bin/kafka-run-class.sh bin/zookeeper-security-migration.sh
bin/kafka-acls.sh bin/kafka-mirror-maker.sh bin/kafka-server-start.sh bin/zookeeper-server-start.sh
bin/kafka-configs.sh bin/kafka-preferred-replica-election.sh bin/kafka-server-stop.sh bin/zookeeper-server-stop.sh
bin/kafka-console-consumer.sh bin/kafka-producer-perf-test.sh bin/kafka-simple-consumer-shell.sh bin/zookeeper-shell.sh
bin/kafka-console-producer.sh bin/kafka-reassign-partitions.sh bin/kafka-topics.sh
bin/kafka-consumer-groups.sh bin/kafka-replay-log-producer.sh bin/kafka-verifiable-consumer.sh

我常用的命令有以下几个,

1
2
3
4
5
6
7
8
9
bin/kafka-server-start.sh -daemon config/server.properties &
bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
bin/kafka-topics.sh --list --zookeeper 192.168.232.23:2181
bin/kafka-topics.sh --delete --zookeeper 192.168.232.23:2181 --topic topic1
bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1
bin/kafka-console-consumer.sh --zookeeper 192.168.232.23:2181 --topic topic1 --from-beginning
bin/kafka-console-producer.sh --broker-list 192.168.232.23:9092 --topic topic1

kafka-server-start.sh 是用于 Kafka 的 Broker 启动的,主要就一个参数 config/server.properties,该文件中的配置项待会再说.还有一个 -daemon 参数,这个是将 Kafka 放在后台用守护进程的方式运行,如果不加这个参数,Kafka 会在运行一段时间后自动退出,据说这个是 0.10.0.0 版本才有的问题 5kafka-topics.sh 是用于管理 Topic 的工具,我主要用的 --describe--list--delete--create 这4个功能,上述的例子基本是不言自明的,--replication-factor 3--partitions 2 这两个参数分别表示3个副本(含 Leader),和2个分区。kafka-console-consumer.shkafka-console-producer.sh 是生产者和消费者的简易终端工具,在调试的时候比较有用,我常用的是 kafka-console-consumer.sh。我没有用 Kafka 自带的 zookeeper,而是用的 zookeeper 官方的发布版本 3.4.8,端口是默认2181,与 Broker 在同一台机器上。

下面说一下 Broker 启动的配置文件 config/server.properties,我在默认配置的基础上,修改了以下一些,

1
2
3
4
broker.id=0
listeners=PLAINTEXT://192.168.232.23:9092
log.dirs=/tmp/kafka-logs
delete.topic.enable=true

broker.id 是 Kafka 集群中的 Broker ID,不可重复,我在多副本的实验中,将他们分别设置为0、1、2;listeners 是 Broker 监听的地址,默认是监听 localhost:9092,因为我不是单机实验,所以修改为本机局域网地址,当然,如果要监听所有地址的话,也可以设置为 0.0.0.0:9092,多副本实验中,将监听端口分别设置为 9092、9093、9094;log.dirs 是 Broker 的 log 的目录,多副本实验中,不同的 Broker 需要有不同的 log 目录;delete.topic.enable 设为 true 后,可以删除 Topic,并且连带 Topic 中的消息也一并删掉,否则,即使调用 kafka-topics.sh --delete 也无法删除 Topic,这是一个便利性的设置,对于开发环境可以,生产环境一定要设为 false(默认)。实验中发现, 如果有消费者在消费这个 Topic,那么也无法删除,还是比较安全的。

剩下的工具多数在文档中也有提到。如果看一下这些脚本的话,会发现多数脚本的写法都是一致的,先做一些参数的校验,最后运行 exec $base_dir/kafka-run-class.sh XXXXXXXXX "$@",可见,这些工具都是使用运行 Java Class 的方式调用的。

Kafka 的 Java API

在编程接口方面,官方提供了 Scala 和 Java 的接口,社区提供了更多的其他语言的接口,基本上,无论用什么语言开发,都能找到相应的 API。下面说一下 Java 的 API 接口。

生产者的 API 只有一种,相对比较简单,代码如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SimpleProducerDemo {
public static void main(String[] args){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.232.23:9092,192.168.232.23:9093,192.168.232.23:9094");
props.put("zookeeper.connect", "192.168.232.23:2181");
props.put("client.id", "DemoProducer");
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
String topic = "topic1";
Boolean isAsync = false;
int messageNo = 1;
while (true) {
String messageStr = "Message_" + String.format("%05d",messageNo);
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else { // Send synchronously
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
++messageNo;
}
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"Send message: (" + String.format("%05d",key) + ", " + message + ") at offset "+ metadata.offset() +
" to partition(" + metadata.partition() +
") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
}
}
}

上例中使用了同步和异步发送两种方式。在多副本的情况下,如果要指定同步复制还是异步复制,可以使用 acks 参数,详细参考官方文档 Producer Configs 部分的内容;在多分区的情况下,如果要指定发送到哪个分区,可以使用 partitioner.class 参数,其值是一个实现了 org.apache.kafka.clients.producer.Partitioner 接口的类,用于根据不同的消息指定分区6。消费者的 API 有几种,比较新的 API 如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.232.23:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2", "topic3"));
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(100);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("Received message: (" + String.format("%05d", record.key()) + ", " + record.value() + ") at offset " + record.offset());
}
}
}
}

消费者还有旧的 API,比如 ConsumerSimpleConsumer API,这些都可以从 Kafka 代码的 kafka-example 中找到,上述的两个例子也是改写自 kafka-example。使用新旧 API 在功能上都能满足消息收发的需要,但新 API 只依赖 kafka-clients,打包出来的 jar 包会小很多,以我的测试,新 API 的消费者 jar 包大约有 2M 左右,而旧 API 的消费者 jar 包接近 16M。

其实,Kafka 也提供了按分区订阅,可以一次订阅多个分区 TopicPartition[];也支持手动提交 offset,需要调用 consumer.commitSync

Kafka 似乎没有公开 Topic 创建以及修改的 API(至少我没有找到),如果生产者向 Broker 写入的 Topic 是一个新 Topic,那么 Broker 会创建这个 Topic。创建的过程中会使用默认参数,例如,分区个数,会使用 Broker 配置中的 num.partitions 参数(默认1);副本个数,会使用 default.replication.factor 参数。但是通常情况下,我们会需要创建自定义的 Topic,那官方的途径是使用 Kafka 的工具。也有一些非官方的途径 7,例如可以这样写,

1
2
3
4
5
6
7
8
9
10
11
12
String[] options = new String[]{
"--create",
"--zookeeper",
"192.168.232.23:2181",
"--partitions",
"2",
"--replication-factor",
"3",
"--topic",
"topic1"
};
TopicCommand.main(options);

但是这样写有一个问题,在执行完 TopicCommand.main(options); 之后,系统会自动退出,原因是执行完指令之后,会调用 System.exit(exitCode); 系统直接退出。这样当然不行,我的办法是,把相关的执行代码挖出来,写一个 TopicUtils 类,如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import joptsimple.OptionSpecBuilder;
import kafka.admin.TopicCommand;
import kafka.admin.TopicCommand$;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import scala.runtime.Nothing$;
public class TopicUtils {
// from: http://blog.csdn.net/changong28/article/details/39325079
// from: http://www.cnblogs.com/davidwang456/p/4313784.html
public static void createTopic(){
String[] options = new String[]{
"--create",
"--zookeeper",
KafkaProperties.ZOOKEEPER_URL,
"--partitions",
"2",
"--replication-factor",
"3",
"--topic",
KafkaProperties.TOPIC
};
// TopicCommand.main(options);
oper(options);
}
public static void listTopic(){
String[] options = new String[]{
"--list",
"--zookeeper",
KafkaProperties.ZOOKEEPER_URL
};
// TopicCommand.main(options);
oper(options);
}
public static void deleteTopic(){
String[] options = new String[]{
"--delete",
"--zookeeper",
KafkaProperties.ZOOKEEPER_URL,
"--topic",
KafkaProperties.TOPIC
};
// TopicCommand.main(options);
oper(options);
}
public static void describeTopic(){
String[] options = new String[]{
"--describe",
"--zookeeper",
KafkaProperties.ZOOKEEPER_URL,
"--topic",
KafkaProperties.TOPIC
};
// TopicCommand.main(options);
oper(options);
}
public static void main(String[] args){
listTopic();
createTopic();
listTopic();
describeTopic();
deleteTopic();
try {
Thread.sleep(3*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
listTopic();
}
/** copied & modified from kafka.admin.TopicCommand$.main
*
* @param args
*/
public static void oper(String args[]){
try {
TopicCommand$ topicCommand$ = TopicCommand$.MODULE$;
final TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(args);
if(args.length == 0) {
throw kafka.utils.CommandLineUtils$.MODULE$.printUsageAndDie(opts.parser(), "Create, delete, describe, or change a topic.");
} else {
int actions =0;
OptionSpecBuilder[] optionSpecBuilders = {opts.createOpt(), opts.listOpt(), opts.alterOpt(), opts.describeOpt(), opts.deleteOpt()};
for (OptionSpecBuilder temp:optionSpecBuilders){
if (opts.options().has(temp)) {
actions++;
}
}
if(actions != 1) {
throw kafka.utils.CommandLineUtils$.MODULE$.printUsageAndDie(opts.parser(), "Command must include exactly one action: --list, --describe, --create, --alter or --delete");
} else {
opts.checkArgs();
ZkUtils zkUtils = kafka.utils.ZkUtils$.MODULE$.apply((String)opts.options().valueOf(opts.zkConnectOpt()), 30000, 30000, JaasUtils.isZkSecurityEnabled());
byte exitCode = 0;
try {
try {
if(opts.options().has(opts.createOpt())) {
topicCommand$.createTopic(zkUtils, opts);
} else if(opts.options().has(opts.alterOpt())) {
topicCommand$.alterTopic(zkUtils, opts);
} else if(opts.options().has(opts.listOpt())) {
topicCommand$.listTopics(zkUtils, opts);
} else if(opts.options().has(opts.describeOpt())) {
topicCommand$.describeTopic(zkUtils, opts);
} else if(opts.options().has(opts.deleteOpt())) {
topicCommand$.deleteTopic(zkUtils, opts);
}
} catch (final Throwable var12) {
scala.Predef$.MODULE$.println((new StringBuilder()).append("Error while executing topic command : ").append(var12.getMessage()).toString());
System.out.println(var12);
exitCode = 1;
return;
}
} finally {
zkUtils.close();
// System.exit(exitCode);
}
}
}
} catch (Nothing$ nothing$) {
nothing$.printStackTrace();
}
}
}

以上的 oper 方法改写自 kafka.admin.TopicCommand$.main 方法。可以发现这部分代码非常怪异,原因是 TopicCommand$ 是 Scala 写的,再编译成 Java class 字节码,然后我根据这些字节码反编译得到 Java 代码,并以此为基础进行修改,等于是我在用 Java 的方式改写 Scala 的代码,难免会觉得诡异。当然,这种写法用在生产环境的话是不太合适的,因为调用的 topicCommand$.createTopic 等方法都没有抛出异常,例如参数不合法的情况,而且也没有使用 log4j 之类的 log 库,只是用 System.out.println 这样的方法屏显,在出现错误的时候,比较难以定位。

Kafka 副本和集群

在生产环境中,Kafka 总是以“集群+分区”方式运行的,以保证可靠性和性能。下面是一个3副本的 Kafka 集群实例。

首先,需要启动3个 Kafka Broker,Broker 的配置文件分别如下,

1
2
3
4
5
6
7
8
9
10
11
broker.id=0
listeners=PLAINTEXT://192.168.232.23:9092
log.dirs=/tmp/kafka-logs
broker.id=1
listeners=PLAINTEXT://192.168.232.23:9093
log.dirs=/tmp/kafka-logs-1
broker.id=1
listeners=PLAINTEXT://192.168.232.23:9094
log.dirs=/tmp/kafka-logs-2

虽然每个 Broker 只配置了一个端口,实际上,Kafka 会多占用一个,可能是用来 Broker 之间的复制的。另外,3个 Broker 都配置了,

1
2
zookeeper.connect=localhost:2181
delete.topic.enable=true

在同一个 Zookeeper 上的 Broker 会被归类到一个集群中。注意,这些配置中并没有指定哪一个 Broker 是主节点,哪些 Broker 是从节点,Kafka 采用的办法是从可选的 Broker 中,选出每个分区的 Leader。也就是说,对某个 Topic 来说,可能0节点是 Leader,另外一些 Topic,可能1节点是 Leader;甚至,如果 topic1 有2个分区的话,分区1的 Leader 是0节点,分区2的 Leader 是1节点。

这种对等的设计,对于故障恢复是十分有用的,在节点崩溃的时候,Kafka 会自动选举出可用的从节点,将其升级为主节点。在崩溃的节点恢复,加入集群之后,Kafka 又会将这个节点加入到可用节点,并自动选举出新的主节点。

实验如下,先新建一个3副本,2分区的 Topic,

1
bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1

初始状况下,topic1 的状态如下,

1
2
3
4
$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

对于上面的输出,即使没有文档,也可以看懂大概:topic1 有2个分区,Partition 0 和 Partition 1,Leader 分别在 Broker 0 和 1。Replicas 表示副本在哪些 Broker 上,Isr(In-Sync Replicas)表示处于同步状态中的 Broker,如果有 Broker 宕机了,那么 Replicas 不会变,但是 Isr 会仅显示没有宕机的 Broker,详见下面的实验。

然后分2个线程,运行之前写的 Producer 和 Consumer 的示例代码,Producer 采用异步发送,消息采用同步复制。在有消息传送的情况下,kill -9 停掉其中2个 Broker(Broker 0 和 Broker 1),模拟突然宕机。此时,topic1 状态如下,

1
2
3
4
$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2
Topic: topic1 Partition: 1 Leader: 2 Replicas: 1,2,0 Isr: 2

可见,Kafka 已经选出了新的 Leader,消息传送没有中断。接着再启动被停掉的那两个 Broker,并查看 topic1 的状态,如下,

1
2
3
4
5
6
7
8
9
$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,1,0
Topic: topic1 Partition: 1 Leader: 2 Replicas: 1,2,0 Isr: 2,1,0
$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,1,0
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 2,1,0

可以发现, 有一个短暂的时间,topic1 的两个分区的 Leader 都是 Broker 2,但是在 Kafka 重新选举之后,分区1的 Leader 变为 Broker 1。说明 Kafka 倾向于用不同的 Broker 做分区的 Leader,这样更能达到负载均衡的效果。

再来看看 Producer 和 Consumer 的日志,下面这个片段是2个 Broker 宕机前后的日志,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
......
Send message: (00439, Message_00439) at offset 217 to partition(0) in 3 ms
Received message: (00438, Message_00438) at offset 216
Send message: (00440, Message_00440) at offset 218 to partition(0) in 5 ms
Send message: (00441, Message_00441) at offset 221 to partition(1) in 5 ms
Received message: (00441, Message_00441) at offset 221
Received message: (00439, Message_00439) at offset 217
Send message: (00442, Message_00442) at offset 222 to partition(1) in 5 ms
Send message: (00443, Message_00443) at offset 219 to partition(0) in 3 ms
Received message: (00440, Message_00440) at offset 218
Received message: (00443, Message_00443) at offset 219
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
Received message: (00442, Message_00442) at offset 222
Send message: (00452, Message_00452) at offset 223 to partition(1) in 7492 ms
Send message: (00454, Message_00454) at offset 224 to partition(1) in 7485 ms
Send message: (00455, Message_00455) at offset 225 to partition(1) in 7482 ms
Send message: (00458, Message_00458) at offset 226 to partition(1) in 7473 ms
Send message: (00460, Message_00460) at offset 227 to partition(1) in 7467 ms
Send message: (00461, Message_00461) at offset 228 to partition(1) in 7465 ms
Send message: (00462, Message_00462) at offset 229 to partition(1) in 7462 ms
Send message: (00463, Message_00463) at offset 230 to partition(1) in 7459 ms
Send message: (00464, Message_00464) at offset 231 to partition(1) in 7456 ms
Send message: (00465, Message_00465) at offset 232 to partition(1) in 7453 ms
......
Send message: (01103, Message_01103) at offset 543 to partition(1) in 5478 ms
Received message: (00631, Message_00631) at offset 310
Received message: (00633, Message_00633) at offset 311
Send message: (00451, Message_00451) at offset 220 to partition(0) in 7525 ms
Received message: (00634, Message_00634) at offset 312
Send message: (00453, Message_00453) at offset 221 to partition(0) in 7518 ms
Received message: (00639, Message_00639) at offset 313
Send message: (00456, Message_00456) at offset 222 to partition(0) in 7509 ms
Received message: (00641, Message_00641) at offset 314
Send message: (00457, Message_00457) at offset 223 to partition(0) in 7506 ms
Received message: (00643, Message_00643) at offset 315
......

出现错误的时候,Producer 抛出了 NetworkException 异常。其中有3589条 Received 日志,3583条 Send 日志,7条 NetworkException 异常日志,发送消息的最大序号是3590,接收消息的最大序号是3589,有以下几个值得注意的地方,

  1. 宕机之前,消息的接收并不是顺序的,这是因为 topic1 有2个分区,Kafka 只保证分区上的有序;
  2. 宕机之后,出现了长段的发送日志而没有接收日志,说明 Kafka 此时正在选举,选举的过程会阻塞消费者;
  3. 从接收消息的条数和序号来看,所有的消息都收到了,没有丢(没有收到3590的消息可能是因为强制退出 client 进程的原因),发送的过程的7个异常应该只是虚警,7条异常对应序号444~450,3583条 Send 消息再加上这7条,与总消息3590条一致;

从这个实验中,可以看到,虽然 Kafka 不保证消息重复发送,但是却在尽量保证没有消息被重复发送,可能我的实验场景还不够极端,没有做出消息重复的情况。

如之前所说,如果要保持完全顺序性,需要使用单分区;如果要避免抛出 NetworkException 异常,就使用 Producer 同步发送。下面,我们重做上面的例子,不同之处是使用单分区和 Producer 同步发送,截取一段 Broker 宕机时的日志如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
......
Sent message: (118, Message_00118)
Received message: (00118, Message_00118) at offset 117
Received message: (00119, Message_00119) at offset 118
Sent message: (119, Message_00119)
Sent message: (120, Message_00120)
Received message: (00120, Message_00120) at offset 119
Sent message: (121, Message_00121)
Received message: (00121, Message_00121) at offset 120
Sent message: (122, Message_00122)
Sent message: (123, Message_00123)
Sent message: (124, Message_00124)
Sent message: (125, Message_00125)
Sent message: (126, Message_00126)
Sent message: (127, Message_00127)
......

可见,由于采用同步发送,Broker 宕机并没有造成抛出异常,另外,由于使用单分区,顺序性也得到了保证,全局没有出现乱序的情况。

综上,是否使用多分区更多的是对顺序性的要求,而使用 Producer 同步发送还是异步发送,更多是出于重复消息的考虑,如果异步发送抛出异常,在保证不丢消息的前提下,势必要重发消息,这就会导致收到重复消息。多分区和 Producer 异步发送,会带来性能的提升,但是也会引入非顺序性,重复消息等问题,如何取舍要看应用的需求。

Kafka 最佳实践

Kafka 在一些应用场景中,有一些前人总结的最佳实践 8 9。对最佳实践,我的看法是,对于自己比较熟悉,有把握的部分,可以按自己的步骤进行;对一些自己不清楚的领域,可以借鉴其中的一些内容,至少不会错的特别厉害。有文章10说,Kafka 在分区比较多的时候,相应时间会变长,这个现象值得在实践中注意。

后记

在 Kafka 与 RocketMQ 的对比中,RocketMQ 的一个核心功能就是可以支持同步刷盘,此时,即使突然断电,也可以保证消息不丢;而 Kafka 采用的是异步刷盘,即使返回写入成功,也只是写入缓冲区成功,并非已经持久化。因此,如果出现断电或 kill -9 的情况,Kafka 内存中的消息可能丢失。另外,同步刷盘的效率是比较低下的,一般生产中估计也不会使用,可以用优雅关闭的方式来关闭进程。如果不考虑这些极端情况的话,Kafka 基本是一个很可靠的消息中间件。

文章目录
  1. 1. 基本概念
    1. 1.1. 关于顺序和分区
    2. 1.2. 关于副本
    3. 1.3. 关于丢消息和重复收到消息
  2. 2. Kafka 的工具和编程接口
    1. 2.1. Kafka 的工具
    2. 2.2. Kafka 的 Java API
  3. 3. Kafka 副本和集群
  4. 4. Kafka 最佳实践
  5. 5. 后记

欢迎来到Valleylord的博客!

本博的文章尽量原创。