文章目录

* 前言 <https://blog.csdn.net/iverson2010112228/article/details/82631554#_1>
* 主要内容 <https://blog.csdn.net/iverson2010112228/article/details/82631554#_5>
* kafka整体架构
<https://blog.csdn.net/iverson2010112228/article/details/82631554#kafka_13>
* kafka定义
<https://blog.csdn.net/iverson2010112228/article/details/82631554#kafka_15>
* 架构图 <https://blog.csdn.net/iverson2010112228/article/details/82631554#_22>
* 名词解释 <https://blog.csdn.net/iverson2010112228/article/details/82631554#_26>
* Kafka重要组件
<https://blog.csdn.net/iverson2010112228/article/details/82631554#Kafka_36>
* 生产者(Producer)
<https://blog.csdn.net/iverson2010112228/article/details/82631554#Producer_38>
* 生产者选择分区
<https://blog.csdn.net/iverson2010112228/article/details/82631554#_50>
* 序列化器和发送
<https://blog.csdn.net/iverson2010112228/article/details/82631554#_55>
* 生产者配置 <https://blog.csdn.net/iverson2010112228/article/details/82631554#_70>
* Broker
<https://blog.csdn.net/iverson2010112228/article/details/82631554#Broker_87>
* Partition Offset
<https://blog.csdn.net/iverson2010112228/article/details/82631554#Partition_Offset_93>
* message物理结构
<https://blog.csdn.net/iverson2010112228/article/details/82631554#message_109>
* Consumer(消费者)
<https://blog.csdn.net/iverson2010112228/article/details/82631554#Consumer_112>
* 消费模式 <https://blog.csdn.net/iverson2010112228/article/details/82631554#_114>
* Consumer消费过程
<https://blog.csdn.net/iverson2010112228/article/details/82631554#Consumer_131>
* Consumer提交偏移量
<https://blog.csdn.net/iverson2010112228/article/details/82631554#Consumer_137>
* Consumer Rebalance
<https://blog.csdn.net/iverson2010112228/article/details/82631554#Consumer_Rebalance_143>
* kafka文件存储
<https://blog.csdn.net/iverson2010112228/article/details/82631554#kafka_160>
* Segment
<https://blog.csdn.net/iverson2010112228/article/details/82631554#Segment_174>
* 消息文件 <https://blog.csdn.net/iverson2010112228/article/details/82631554#_181>
* zookeeper和kafka
<https://blog.csdn.net/iverson2010112228/article/details/82631554#zookeeperkafka_191>
* kafka在zookeeper中的存储结构
<https://blog.csdn.net/iverson2010112228/article/details/82631554#kafkazookeeper_195>
* zookeeper在Kafka中的作用
<https://blog.csdn.net/iverson2010112228/article/details/82631554#zookeeperKafka_199>
* Kafka与zookeeper通信
<https://blog.csdn.net/iverson2010112228/article/details/82631554#Kafkazookeeper_212>
* Kafka高可用
<https://blog.csdn.net/iverson2010112228/article/details/82631554#Kafka_221>
* Controller Failover
<https://blog.csdn.net/iverson2010112228/article/details/82631554#Controller_Failover_223>
* Partition Failover
<https://blog.csdn.net/iverson2010112228/article/details/82631554#Partition_Failover_228>
* Broker Failover
<https://blog.csdn.net/iverson2010112228/article/details/82631554#Broker_Failover_237>
* kafka常见问题
<https://blog.csdn.net/iverson2010112228/article/details/82631554#kafka_240>
* 参考资料 <https://blog.csdn.net/iverson2010112228/article/details/82631554#_253>


<>前言


最近在公司内部进行了一个知识分享,主题是kafka。分享ppt我会上传放在文末。我为了准备这个分享,几乎花了完整两周时间,看了两本书,阅读了几十篇博客,从中提炼出几十页ppt拿出来演讲,分享完后总感觉自己还有很多东西没有讲出来。
这次就把我两个星期的学习成果形成博客。便于自己回顾和大家分享。

<>主要内容

* kafka系统架构概要介绍
* kafka重要组件
* kafka文件存储
* zookeeper与kafka
* 高可用kafka
* kafka常见问题
<>kafka整体架构

<>kafka定义

旧:在kafka0.8.x版本的时候,kafka主要是作为一个分布式的、可分区的、具有副本数的日志服务系统,
具有高水平扩展性、高容错性、访问速度快、分布式等特性;主要应用场景是:日志收集系统和消息系统

新:0.10.x版本及以上,Kafka是一个分布式的流处理平台(数据注入功能,数据存储功能,流处理功能)


注:今天主要还是讲它作为一个消息中间件的功能作用,kafka各个版本内部处理有差异,如何分享是一个挑战。学习的时候一定要带上版本的概念,因为网上很多资料都没有说明版本,然后会造成理解偏差。我们这次主要分享新版本kafka

<>架构图

kafka大体架构图如下:


<>名词解释

* Broker Kafka集群中的服务器
* Topic 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic
* Partition Partition是物理上的概念,每个Topic包含一个或多个Partition
* Replica Partition 的副本,保障 partition 的高可用
* Producer 消息生产者 负责发布消息到Kafka broker
* Consumer 消息消费者,向Kafka broker读取消息的客户端
* Consumer Group 每个 consumer 都属于一个 consumer group
<>Kafka重要组件

<>生产者(Producer)

生产这消息发送过程如下图:


解读
ProducerRecord:每个消息是一个ProducerRecord对象,其中Topic和Value值必填,partition和key非必填。
过程
send()方法大致过程为:
设置序列化器->设置分区->放入队列缓存->等待时机push到broker
注:
不是直接发送给服务端,而是先在客户端把消息放入队列中, 然后由一个消息发送线程从队列中拉取消息,以批量的方式发送消息给服务端。 Kafka的记录收集器(
RecordAccumulator)负责缓存生产者
客户端产生的消息,发送线程(Sender)负责读取记录收集器的批量消息,通过网络发送给服务端。为了保证客户端网络请求的快速响应,Kafka使用选择器(
Selector) 处理网络连接和读写处理,使用 网络连接( NetworkClient)处理客户端网络请求。

<>生产者选择分区

选择分区流程图如下:

其中,散列化方法为:Utils.murmur2(keyBytes)

<>序列化器和发送

发送方式
新版本客户端提供两种发送方式,同步和异步:


可以发现,两个方法其实都是异步返回。
同步方式,第一种,调用send()后,马上get(),实现同步调用。
异步方式,第二种,在callback中进行内容处理,实现异步调用。

序列化器
可以使用内置序列化器,比如StringSerializer,IntegerSerializer,ByteArraySerializer等基本的序列化器。
也可以自定义,需要实现org.apache.kafka.common.serialization.Serializer接口

注:官方建议不要自定义序列化器,因为在消费端,需要使用同样的反序列化器。使用kafka自带的,可以避免很多问题。

<>生产者配置

kafka重要的配置文件有三个:
server.properties
broker.properties
consumer.properties
都可以去这里查看配置的意义和解释:官网配置 <https://kafka.apache.org/documentation/#configuration>

这里讲一下生产者重要的几个配置

* acks 如果 acks 被设为 0, 那么 broker立即返回响应;acks=1,需要等待leader写入成功;如果 acks 被设为
all,那么请求会被保存在一个叫作炼狱的缓冲 区里,直到首领发现所有跟随者副本都复制了消息,晌应才会被返回给客户端。
* buffer.memory 设置生产者内缓存区域的大小,生产者用它缓冲要发送到服务器的消息。
* compression.type 默认情况下,消息发送时不会被压缩,该参数可以设置成snappy、gzip或lz4对发送给broker的消息进行压缩
* retries 生产者从服务器收到临时性错误时,生产者重发消息的次数
* batch.size
发送到同一个partition的消息会被先存储在batch中,该参数指定一个batch可以使用的内存大小,单位是byte。不一定需要等到batch被填满才能发送
* linger.ms <http://linger.ms> 生产者在发送消息前等待linger.ms
<http://xn--linger-ol8i62w0sdj8k72xgqcn38ddhwqev5jrv87d.ms>
,从而等待更多的消息加入到batch中。如果batch被填满或者linger.ms达到上限,就把batch中的消息发送出去
* max.in.flight.requests.per.connection 生产者在收到服务器响应之前可以发送的消息个数
其中acks尤为重要,一定要记住他的意义。
<>Broker


前面讲到Broker其实可以当作一个服务器来理解,它上面有很多partition和partition的副本。那么partition和副本直接的数据是如何同步呢?如下图:



这个图信息量很大,可以看出,副本与leader之间的数据同步是副本去leader那里pull的过程。生产者发送消息到broker后,会根据配置的acks值,来决定何时返回。这个acks值,就是说这里的副本复制情况。

<>Partition Offset

Partition是消息的分区队列,一个topic写入不用的partition,写入过程中会更新offset,过程如下:


上面图中,有几个重要的名词:
1.ISR(In-sync Replication)
ISR中的副本都要同步leader中的数据,只有都同步完成了数据才认为是成功提交了,成功提交之后才能供外界访问。
在这个同步的过程中,数据即使已经写入也不能被外界访问,这个过程是通过LEO-HW机制来实现的。
2.OSR(Out-sync Replication)
OSR内的副本是否同步了leader的数据,不影响数据的提交,OSR内的follower尽力的去同步leader,可能数据版本会落后。

最开始所有的副本都在ISR中,在kafka工作的过程中,如果某个副本同步速度慢于replica.lag.time.max.ms指定的阈值,则被踢出ISR存入OSR,如果后续速度恢复可以回到ISR中。
3.LEO
LogEndOffset:分区的最新的数据的offset,当数据写入leader后,LEO就立即执行该最新数据。相当于最新数据标识位。
4.HW

HighWatermark:只有写入的数据被同步到所有的ISR中的副本后,数据才认为已提交,HW更新到该位置,HW之前的数据才可以被消费者访问,保证没有同步完成的数据不会被消费者访问到。相当于所有副本同步数据标识位。

<>message物理结构



<>Consumer(消费者)

<>消费模式

谈到消费,一直以来,我们有两种消费模式,如图:


Kafka使用消费组(consumer group)统一 了上面两种消息模型 。Kafka使用队列模型时,它可以将处理
工作平均分配给消费组中的消费者成员;使用发布订阅模式时,它可以将消息 广播给多个消费组。
采用多个消费组结合多个消费者,既可以线性扩展消息的处理能力,也允许消息被多个消费组订阅。
kafka的消费模式:


Kafka采用消费组保证了“一个分区只可被消费组中的一个消费者所消费” ,这意味着:
(1)在一个消费组中,一个消费者可以消费多个分区 。
(2)不同的消费者消费的分区一定不会重复,所有消费者一起消费所有 的分区 。
(3)在不同消费组中,每个消费组都会悄费所有的分区 。
(4)同一个消费组下消费者对分区是互斥的,而不同消费组之间是共享的。

<>Consumer消费过程


由图中,我们知道,kafka的消费者客户端不断德调用poll()方法去轮询,从Broker中拉取消息。
topic下的一个分区只能被同一个consumer
group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据

<>Consumer提交偏移量




由图中,我们知道,新版本客户端中,消费者提交offset不再提交到zookeeper中,而是提交到Broker中的topic为_consumer_offset的分区上。

<>Consumer Rebalance

* 什么是rebalance?
rebalance本质上是一种协议,规定了一个consumer
group下的所有consumer如何达成一致来分配订阅topic的每个分区。比如某个group下有20个consumer,它订阅了一个具有100个分区的topic。正常情况下,Kafka平均会为每个consumer分配5个分区。这个分配的过程就叫rebalance。
* rebalance的触发条件有三种
1,组成员发生变更
2,订阅主题数发生变更
3,订阅主题的分区数发生变更
简单举个例子,假设目前某个consumer group下有两个consumer:
A和B,当第三个成员加入时,kafka会触发rebalance并根据默认的分配策略重新为A、B和C分配分区,如图所示


消费组分配partition过程如图:

注:在coordinator收集到所有成员请求前,它会把已收到请求放入一个叫purgatory(炼狱)的地方

新版kafka有两个协调器:消费者协调器(ConsumerCoordinator)和组协调器(GroupCoordinator),这里图中提到的是
消费者协调器?

<>kafka文件存储

Server.properties配置文件中,有log.dirs配置,指向的就是kafka文件存储位置。
我本机安装了一个单机版的kafka,文件如下:


文件目录解释:

* __consumer_offsets 用于存储offset的分区是由kafka服务器默认自动创建的
* cleaner-offset-checkpoint 存了每个log的最后清理offset
* log-start-offset-checkpoint 日志可以返回给Client的最开始边界
* recovery-point-offset-checkpoint 负责记录已经被写入磁盘的offset
* replication-offset-checkpoint 负责记录已经被复制到别的topic上的文件(HighWatermark的)
* testliyi-0 我创建的topic对应的分区0
<>Segment

kafka的partition其实就是一个个文件,然后kafka会根据配置把这些文件进行分段,每一段就叫做segment,如下图所示:


segment file组成:由2大部分组成,分别为index file和data
file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件.

segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

<>消息文件

如下图,展示了index文件和log文件的内容:



文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497。

下面是书上截取的更详细的过程图:


<>zookeeper和kafka

zookeeper是什么,大家肯定不陌生,直接上图看下我的ppt:


<>kafka在zookeeper中的存储结构

上面可知zookeeper是存在于内存重的类似文件节点。那么,Kafka在zookeeper内部的存储结构是怎样的呢?一图说明一切:


<>zookeeper在Kafka中的作用

* 1,配置管理(high)
Topic的配置之所以能动态更新就是基于zookeeper做了一个动态全局配置管理。
* 2,命名服务(normal)
Broker将advertised.port和advertised.host.name
<http://xn--Brokeradvertised-qf93a.xn--portadvertised-jq7x.host.name>
,这两个配置发布到zookeeper上的zookeeper的节点上/brokers/ids/BrokerId(broker.id
<http://broker.id>),这个是供生产者,消费者,其它Broker跟其建立连接用的。
* 3,分布式通知(high)
比如分区增加,topic变动,Broker上线下线等均是基于zookeeper来实现的分布式通知。
* 4,集群管理和master选举(normal)
* 5,分布式锁 (high)
独占锁,用于Controller的选举。(临时顺序节点创建)
<>Kafka与zookeeper通信

通过查看kafka源码,我们知道,Kafka使用的是zkclient(
https://github.com/sgroschupf/zkclient)开源第三方客户端。通信方式为监听器。
<https://github.com/sgroschupf/zkclient%EF%BC%89%E5%BC%80%E6%BA%90%E7%AC%AC%E4%B8%89%E6%96%B9%E5%AE%A2%E6%88%B7%E7%AB%AF%E3%80%82%E9%80%9A%E4%BF%A1%E6%96%B9%E5%BC%8F%E4%B8%BA%E7%9B%91%E5%90%AC%E5%99%A8%E3%80%82>
主要有以下三种监听器:

* “主题改变的监听器”( TopicChangeListener)会监听/brokers/topics/的子节点变化事件。 当主题发生变 化时,监听器
会处理主题的增 加和删除事件 。 比如创 建主题时, Kafka会往 ZK 节点 /brokers/topics/添加子节点
/brokers/topics/[topic_name], 并触发监昕器调用 onNewTopic.creation()方法 。
* “分区改变的监听器”( PartitionModificationsListener)会监听 /brokers/topics/[topic]
节点的数据变化事件。 当主题的分区发生变化时,监昕器会处理分区增加的事件。 比如增加 分区时, Kafka会修改 ZK节点
/brokers/topics/[topic_name]的数据内容。
对于主题中新增的分区,监听器会调用onNewPartitionCreation()方法创建新的分区。
* “代理节点改变的监听器”( BrokerChangelistener)会监听/brokers/ids的子节点变化事件。
当代理节点发生变化时,监听器会处理代理节点的上线和下线事件 。 比如代理节点君机,
Kafka会删除/brokers/ids/[boker_id]子节点,并触发监昕器调用 onBrokerFailure()方法。 代理节点上线时,
Kafka会创建 /brokers/ids/[broker_id]子节点,并触发监昕器调用 onBrokerStart()方法。
<>Kafka高可用

<>Controller Failover

以下是基于ZK选举器选举主控制器的流程 (虽然每个代理节点都有一个控制器对象,但 Kafka集群只有一个主控制器 。)



<>Partition Failover

副本和分区的状态:


选举过程:


如上图所示,读取OSR作为主副本会造成数据丢失。所以kafka会定时检查是否有所有ISR都为存活状态

<>Broker Failover



<>kafka常见问题


以下两个问题经常出现在面试中,但是深入学习kafka之后,你会发现,能问出这种问题的一般都不怎么懂kafka,如果你深入了解Kafka,你可以从kafka的消息着手发问,而不是问一个什么条件都没有,半小时也讲不清楚的问题。

*
如何保证消息不丢失
默认保证at least once
挑战两种情况:
1,auto.commit.enable=true
2,auto.commit.enable=false

*
如何保证有序
分区有序。如何保证全局有序?(伪命题)

<>参考资料

* https://book.douban.com/subject/27665114/
<https://book.douban.com/subject/27665114/>
* https://book.douban.com/subject/27179953/
<https://book.douban.com/subject/27179953/>
* https://tech.meituan.com/kafka_fs_design_theory.html
<https://tech.meituan.com/kafka_fs_design_theory.html>
* http://www.cnblogs.com/cyfonly/p/5954614.html
<http://www.cnblogs.com/cyfonly/p/5954614.html>
* https://blog.csdn.net/lp284558195/article/details/80297208
<https://blog.csdn.net/lp284558195/article/details/80297208>

友情链接
ioDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信