第1章 初识Kafka
Kafka是一个多分区、多副本且基于ZooKeeper协调的分布式消息系统。
Kafka的三大角色:
- 消息系统:具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能,同时还提供消息顺序性保障及回溯消费的功能。
- 存储系统:Kafka把消息持久化到磁盘。可以把Kafka作为长期的数据存储系统,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能。
- 流式处理平台:Kafka不仅为流式处理框架提供可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。
基本概念
Kafka体系
- 若干Producer:将消息发送到Broker
- 若干Broker:将收到的消息存储到磁盘中
- 若干Consumer:从Broker订阅并消费信息
- 一个Zookeeper集群:负责集群元数据的管理、控制器的选举等操作
主题、分区、偏移量
主题是一个逻辑上的概念,可以细分为多个分区
同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志文件,消息在被追加到分区日志文件的时候会分配一个特定的偏移量,Kafka通过偏移量来保证消息在分区内的顺序性,Kafka保证的是分区有序而不是主题有序。
Kafka中的分区可以分布在不同的broker上,也就是说,一个主题可以横跨多个broker,以此来提供必单个broker更强大的性能。
Kafka为分区引入了多副本机制,同一分区的不同副本中保存的是相同的信息(在同一时刻,副本之间并非完全一样),副本之间是“一主多从”关系,leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。
目前Kafka只支持增加分区,不支持减少分区
分区副本数可以增加也可以减少
AR、ISR、OSR
分区中的所有副本统称为AR(Assigned Replicas)。所有与leader副本保持一定程度同步的副本(包括leader副本)组成ISR(In-Sync Replicas),ISR集合是AR集合中的一个子集。
与leader副本同步滞后过多的副本(不包括leader副本)组成OSR(Out-of-Sync Replicas)。
AR = ISR + OSR。正常情况下,AR = ISR,OSR集合为空。
leader副本负责维护和跟踪ISR集合中所有follower副本的滞后状态,当follower副本落后太多或失效时,leader副本会把他从ISR集合中剔除。如果OSR集合中有follower副本追上了leader副本,那么leader副本会把它从OSR转移至ISR。
默认情况下只有ISR集合中的副本可以被选举为leader,不过也可以通过修改参数来改变。
HW、LEO
HW(High Watermark),俗称高水位,消费者只能拉取到这个offset之前的消息。
上图中消费者只能拉取到offset在0到5之间的消息。
LEO(Log End Offset),标识当前日志文件中下一条待写入消息的offset,相当于当前分区中最后一条消息的offset值+1。
ISR集合中最小的LEO即为分区的HW。可以理解为消息只有被所有分区都写入后,消费者才可以拉取到。
第2章 生产者
客户端开发
消息的发送
KafkaProducer的send()方法:
1 | public Future<RecordMetadata> send(ProducerRecord<K,V> record) |
同步发送
要实现同步的发送方式,可以利用返回的Future对象实现:
1 | try { |
send()方法是异步的,返回的Future对象可以使调用方稍后获得发送的结果,调用get()方法阻塞等待Kafka的相应。
也可以通过RecordMetadata对象获取一些元数据信息。
1 | try { |
Future的get(long timeout, TimeUnit unit)方法可以实现可超时的阻塞。
异步发送
使用Callback的方式,Kafka有响应时就会回调,要么发送成功,要么抛出异常。
1 | producer.send(record, new Callback() { |
onCompletion()方法的两个参数是互斥的,消息发送成功时,metadata不为null,消息发送异常时,exception不为null。
对同一个分区而言,KafkaProducer可以保证先发送的消息对应的callback先调用,也就是说,回调函数的调用也可以保证分区有序。
常见的异常
可重试的异常:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException等,重试之后可以恢复。
不可重试的异常:RecordTooLargeException,Kafka不会进行任何重试,直接抛出异常。
对于可重试的异常,如果配置了retries参数,只要在规定的重试次数内自行恢复了,就不会抛出异常。
分区器
send() -> 拦截器 -> 序列化器 -> 分区器 -> broker
拦截器不是必需的,序列化器是必需的。
消息经过序列化后需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,就不需要分区器,如果没有指定,就需要依赖分区器,根据key来计算partition值。
默认分区器DefaultPartitioner,如果key不为null,根据key的hash值来计算分区号,如果key为null,消息将会以轮询的方式发往各个可用分区
如果key不为null,那么计算得到的分区号会是所有分区中的任意一个;
如果key为null并且有可用分区时,那么计算得到的分区号仅为可用分区中的任意一个。
自定义分区器:
实现Partitioner类,重写partition方法。
生产者拦截器
KafkaProducer会在消息被应答前或消息发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于用户设定的Callback。
可以配置多个拦截器,组成拦截链,拦截链会按照配置顺序依次执行。
如果某个拦截器执行失败,下一个拦截器会接着从上一个执行成功的拦截器继续执行。
原理分析
整体架构
整个生产者客户端由两个线程协调运行,分别为主线程和Sender线程。
在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator)中。
Sender线程负责从消息累加器中获取消息并将其发送到Kafka。
RecordAccumulator用来缓存消息以便Sender线程可以批量发送,缓存的大小通过生产者客户端参数buffer.memory配置,默认32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么阻塞,要么抛出异常,取决于max.block.ms,默认60s。
主线程发过来的消息会被追加到RecordAccumulator的某个双端队列中,RecordAccumulator的内部为每个分区都维护了一个双端队列,队列的内容是ProducerBatch,即Deque
ProducerRecord是生产者中创建的消息,ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中。这样可以使字节更紧凑,减少网络请求次数。
消息在网络上都是以字节的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放,为了实现ByteBuffer的复用,实现缓存的高效利用,在RecordAccumulator内部还有一个BufferPool,它只针对特定大小的ByteBuffer进行管理,其他大小的ByteBuffer不会被缓存进BufferPool中。大小由batch.size指定,默认16KB,我们可以适当调大以便多缓存一些消息。
ProducerBatch的大小和batch.size也有密切关系。当一条消息流入RecordAccumulator时,会先寻找与消息分区对应的双端队列(如果没有则新建),再从队列的尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size,如果不超过,就以batch.size大小创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,就以评估的大小创建ProducerBatch,这段内存区域不会被复用。
请求重Sender线程发往Kafka之前还会保存到InFlightRequests中,它的作用是缓存了已经发出去但还没有收到响应的请求,配置参数为max.in.flight.requests.per.connection,默认5个,即每个连接最多只能缓存5个未响应的请求,超过该数值后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了相应。通过比较Deque
InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的。InFlightRequests中未确认的请求越少,负载越小。
选择leastLoadedNode发送请求可以使它尽快发出,避免网络拥塞。
元数据的更新
元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群有哪些节点,控制器节点是哪个,客户端自己发现其他broker节点,分区数量及leader副本的动态变化等。
当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms(默认5分钟)时间没有更新元数据都会引起元数据的更新操作。
元数据的更新是在客户端内进行的,对客户端的外部使用者不可见。
当需要更新元数据时,会挑选出leastLoadedNode,向这个Node发送MetadataRequest来获取具体的元数据信息。这个更新操作由Sender线程发起,创建完MetadataRequest后同样会存入InFlightRequests,之后的步骤和发送消息类似。主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障。
重要的生产者参数
acks
acks = 1。默认值即为1,生产者发送消息后,只要分区的leader副本成功写入,那么它就会收到来自服务端的成功相应。如果消息成功写入leader副本并返回成功响应给生产者,且在被其他follower副本拉取之前leader副本崩溃,那么此时消息还是会丢失。这是消息可靠性和吞吐量之间的折中方案。
acks = 0。生产者发送消息后不需要等待任何服务端相应。
acks = -1 或 acks = all。生产者消息发送之后,需要等待ISR中所有副本都成功写入后才能收到服务端的成功相应。
当ISR中之后leader副本时,就会退化成acks = 1。
max.request.size
用来限制生产者客户端能发送的消息的最大值,默认为1MB。
不建议修改,比如将broker端的message.max.bytes参数配置为10,而max.request.size配置为20,当我们发送一条大小为15B的消息时,生产者客户端就会报错。
retries和retry.backoff.ms
retries参数用来配置生产者重试的次数,默认值为0。可以配置大于0,通过内部重试来恢复一些异常。
retry.backoff.ms默认100,用来设定两次重试之间的时间间隔,避免无效的频繁重试。
max.in.flight.requests.per.connection
每个连接最多能缓存未响应的请求的个数
对于某些应用,比如Mysql的binlog传输,顺序性非常重要,如果将acks参数配置为非零值,并且max.in.flight.requests.per.connection参数配置为大于1的值,就会出现错序的现象:
如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试发送第一批次的消息,此时如果第一批次的消息写入成功,那么这两个批次的消息就出现了错序。
一般在需要保证消息顺序的场合建议把max.in.flight.requests.per.connection配置为1,而不是把acks配置为0,不过这样也会影响整体的吞吐。
第3章 消费者
消费者与消费者组
消费者(Consumer)负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。
每个消费者都有一个对应的消费者组(Consumer Group),当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。
图中的两个消费组之间互不影响,每个分区只能被一个消费组中的一个消费者所消费。
可以通过消费者客户端参数partition.assignment.strategy来设置消费者与主题之间的分区分配策略。
Kafka同时支持两种消息投递模式:
- 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,相当于点对点模式。
- 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,相当于发布/订阅模式。
消费组是逻辑上的概念,消费者并非逻辑上的概念,它是实际的应用实例,可以是一个线程,也可以是一个进程。
客户端开发
订阅主题与分区
一个消费者可以订阅一个或多个主题。
1.集合的方式订阅主题
consumer.subscribe(Arrays.asList(topic1));
2.正则的方式订阅主题
consumer.subscribe(Pattern.compile(“topic-.*”));
如果有新创建的主题与正则匹配,就可以消费到新添加的主题。
3.订阅某些特定分区
consumer.assign(Arrays.asList(new TopicPartition(“topic1”,0)));
如果不知道有哪些分区,可以通过consumer.partitionsFor(topic)获取。
通过subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区策略自动分配各个消费者与分区的关系。在消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。
而assign()方法不具备。
消息消费
Kafka中的消息基于拉模式。是一个不断轮询的过程,消费者要做的就是重复调用poll()方法,poll()方法返回的是所订阅的主题上的一组消息,类型是ConsumerRecords,里面包含若干ComsumerRecord。
可以通过
1 | public List<ConsumerRecord<K, V>> records(TopicPartition partition) |
方法获取消息集中指定分区的消息。
如果订阅多个主题,可以通过
1 | public Iterable<ConsumerRecord<K, V>> records(String topic) |
按照主体维度进行消费。
位移提交
在旧的消费者客户端中,消费位移存储在Zookeeper中,新消费者客户端存储在Kafka内部主题__consumer_offsets中。
表示当前已经消费了x,但当前消费者需要提交的消费位移并不是x,而是x+1。
(1)自动提交
Kafka默认的消费位移的提交方式是自动提交,由enable.auto.commit配置,默认为true。
自动提交不是每消费一条消息就提交一次,而是定期提交,由auto.commit.interval.ms配置,默认5s。
消费者每隔5s会将拉取到的每个分区中的最大的消息位移进行提交,提交的动作是在poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,就会提交上一次轮询的位移。
(2)手动提交
前提:enable.auto.commit配置为false
同步提交 —— commitSync():
commitSync()方法会根据poll()方法拉取的最新位移来进行提交,只要没有发生不可恢复的错误,就会阻塞消费者线程直到位移提交完成。
异步提交 —— commitAsync():
异步提交在执行的时候消费者线程不被阻塞。
控制或关闭消费
1 | public void pause(Collection<TopicPartition> partitions) |
使用pause()和resume()方法实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据的操作。
如何优雅地退出while(isRunning.get())循环:
通过在其他地方设置isRunning.set(false)或者从其他线程里调用KafkaConsumer的wakeup()方法,调用后可以退出poll()逻辑,并抛出异常。
跳出循环后要显示的执行close()方法。
指定消费位移
没有可以查找的消费位移的情况:
1.当一个新的消费组建立的时候
2.消费组内的一个新消费者订阅了一个新的主题
3.当__consuemer_offsets主题中有关这个消费组的位移信息过期而被删除
每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始消费。默认值为“latest”。
当配置为“none”时,意味着当找不到位移时既不从最新的消息位置处开始消费,也不从最早的消息位置出开始消费,此次会报NoOffsetForPartitionException。
除了找不到消费位移,位移越界也会触发auto.offset.reset参数的执行
从特定的位移处开始拉取消息——seek()
1 | public void seek(TopicPartition partition, long offset) |
执行seek()方法前需要先执行一次poll()方法,等到分配到分区之后才可以重置消费位置。
从分区的开头或末尾开始消费——seekToBeginning()、seekToEnd()
从具体时间点开始消费
首先通过offsetForTimes()方法获取一天之前的消息位置,然后使用seek()方法追溯到相应位置开始消费。
再均衡
再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,使我们可以既方便又安全地删除消费者组内的消费者或往消费组内添加消费者。
再均衡发生期间,消费组会变得不可用。如果消费者消费完某个分区的一部分消息时还没来得及提交就发生了再均衡,这个分区又被分配给了另一个消费者,原来消费完的消息又被重新消费一遍,发生了重复消费。
再均衡监听器:ConsumerRebalanceListener接口
1 | // 再均衡开始之前和消费者停止读取消息之后被调用 |
在发生再均衡之前通过再均衡监听器的onPartitionsRevoked()回调执行commitSync()方法同步提交消费位移,避免重复消费
消费者拦截器
实现 ConsumerInterceptor接口,重写onConsume()、onCommit()、close()方法。
1 | public ConsumerRecords<K,V> onConsume(ConsumerRecords<K,V> records); |
KakfaConsumer会在poll()方法返回之前调用拦截器的onConsume()方法来对消息进行定制化操作。
在提交完消费位移之后调用onCommit()方法,可以使用这个方法记录跟踪所提交的位移信息。比如当消费者使用commitSync的无参方法时,我们不知道提交的消费位移的细节,使用onCommit()方法可以做到这一点。
重要的消费者参数
第4章 主题与分区
分区的管理
优先副本的选举
在创建主题的时候,该主题的分区及副本会尽可能均匀地分布到Kafka集群的各个broker节点上,对应的leader副本的分配也比较均匀。
当分区的leader节点发生故障时,其中一个follower节点就会成为新的leader节点,当原来的leader节点恢复之后重新加入集群时,它只能成为一个新的follower节点而不再对外提供服务,这样就会导致集群的负载不均衡。
为了治理负载失衡的情况,Kafka引入了优先副本的概念。
优先副本指在AR集合列表中的第一个副本,理想情况下,优先副本就是该分区的leader副本。
所谓的优先副本的选举就是指通过一定的方式促使优先副本选举为leader副本,促进集群的负载均衡,这一行为也称为”分区平衡“。
Kafka提供分区自动平衡功能,broker端参数是auto.leader.rebalance.enable,默认为true。
开启后,Kafka的控制器会启动一个定时任务,轮询所有broker,计算每个broker的分区不平衡率是否超过leader.imbalance.per.broker.percentage(默认10%),如果超过则自动执行优先副本的选举。执行周期为leader.imbalance.check.interval.seconds(默认5分钟)。
生产环境不建议开启,因为这会引起性能问题,可以在波谷期手动执行。
Kafka的kafka-perferred-replica-election.sh脚本提供了对分区leader副本进行重新平衡的功能。可以通过配置path-to-json-file文件来制定执行优先副本选举的分区清单,从而达到分批、手动执行的效果。
分区重分配
需要分区重分配的场景:
(1)当要对集群中的一个节点进行有计划地下线操作时,为了保证分区及副本的合理分配,将该节点上的分区副本迁移到其他的可用节点上。
(2)当集群中新增broker节点时,只有新创建的主题分区才有可能被分配到这个节点上,之前的主题分区并不会自动分配到新加入的节点上,新节点的负载和老节点的负载不均衡。
Kafka提供了kafka-reassign-partition.sh脚本来执行分区重分配。
执行步骤:
(1)创建一个包含主题清单的JSON文件
(2)根据主题清单和broker节点清单生成一份重分配方案
(3)根据方案执行重分配动作
第5章 日志存储
文件目录布局
一个副本对应一个日志(Log),为了防止Log过大,Kafka引入了日志分段(LogSegment)概念,将Log切分为多个LogSegment。
Log和LogSegment不是纯粹物理意义上的概念,Log在物理上只以文件夹的形式存储,而每个LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。
向Log中追加消息时是顺序写入的,只有最后一个LogSegment才能执行写入操作,在此之前所有的LogSegment都不能写入数据。
每个LogSegment中的日志文件(以.log为文件后缀)都有对应的两个索引文件:偏移量索引文件(以.index为文件后缀)和时间戳索引文件(以.timeindex为文件后缀)。
日志清理
两种日志清理策略:
(1)日志删除:按照一定的保留策略直接删除不符合条件的日志分段。
(2)日志压缩:针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。
通过broker端参数log.cleanup.policy设置delete/compact。
日志删除
1.基于时间
日志删除任务会检查当前日志文件中是否保留时间超过设定的阈值来寻找可删除
的日志分段文件集合。
2.基于日志大小
日志检查任务会检查当前日志的大小是否超过设定的阈值来寻找可删除的日志分段的文件集合。
3.基于日志起始偏移量
判断依据是某日志分段的下一个日志分段的起始偏移量baseOffset是否小于等于logStartOffset,若是,则可以删除此日志分段。
磁盘存储
Kafka依赖于文件系统(更底层地来说就是磁盘)来存储和缓存消息。
而磁盘顺序写盘的速度不仅比随机写盘的速度快,而且也比随机写内存的速度快。
Kafka采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作。
页缓存
页缓存是操作系统实现的一种主要的磁盘缓存,以此来减少对磁盘I/O的操作。
具体来说就是磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。
零拷贝
零拷贝技术基于底层的sendfile()方法实现。
非零拷贝技术:
从内核模式复制到用户模式,再从用户模式复制回内核模式,内核和用户模式的上下文切换了4次。
零拷贝技术:
零拷贝通过DMA技术将文件内容复制到内核模式下的Read Buffer中。不过数据没被复制到Socket Buffer,只有包含数据的位置和长度的信息的文件描述符被加到Socket Buffer中。DMA引擎直接将数据从内核模式中传递到网卡设备,数据只经历了2次复制,上下文切换也变成了2次。
第6章 深入服务端
第7章 深入客户端
第8章 可靠性探究
可靠性分析
如何最大程度地提高可靠性
1.副本数
副本数越多越能保证数据的可靠性,不过副本数越多也会引起磁盘、网络带宽的浪费,一般设为3,对可靠性要求高的场景可以设为5。
2.acks
acks=1,生产者将消息发送到leader,leader在成功写入本地日志后会告知生产者已经成功提交。
如果ISR集合中的follower还没来得及拉取,leader就宕机了,消息就会丢失。
acks=-1,生产者将消息发送到leader,leader在成功写入本地日之后还要等待ISR中的follower副本全部同步完成才能告知生产者已经成功提交。
如果leader写入后,被follower同步之前,leader宕机了,生产者会收到异常。
如果leader和follower都写入了,在返回acks之前,leader宕机了,生产者会重发数据,造成数据重复。可以通过开启幂等性避免。
如果某个临界点时所有的follower都被剔除出了ISR,那么ISR中只有leader,acks=-1就变为了acks=1。可以通过设置min.insync.replicas指定ISR中最小的副本数。
3.发送模式
发后即忘、同步、异步
采取同步或异步模式,在出现异常时可以及时获得通知,重试发送。(可能会引起消息重复)
如果配置的retries参数大于0,max.in.flight.requests.per.connectioin参数大于1,可能会影响消息的顺序性。
4.unclean.leader.election.enable(默认false)
如果为true,意味着当leader下线时候,可以从非ISR集合选举新leader,这样可能造成数据的丢失。
5.消费端
将enable.auto.commit设置为false,手动提交位移。
也可以通过回溯消费功能对漏掉的信息进行回补。