Apache Kafka 学习笔记

像 Kafka 这一类的系统国外有专属的名字叫Messaging System,国内很多文献将其简单翻译成消息系统。但这种直白的翻译并不准确,因为它片面强调了消息主体的作用,而忽视了这类系统引以为豪的消息传递属性,就像引擎一样,具备某种能量转换传输的能力,所以翻译成消息引擎反倒更加贴切。

消息引擎系统

根据维基百科的定义,消息引擎系统是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递。

An enterprise messaging system (EMS) or messaging system in brief is a set of published enterprise-wide standards that allows organizations to send semantically precise messages between computer systems.

https://en.wikipedia.org/wiki/Enterprise_messaging_system

上述通俗来说就是:系统 A 发送消息给消息引擎系统,系统 B 从消息引擎系统中读取 A 发送的消息。

最基础的消息引擎就是做这点事的!不论是上面哪个版本,它们都提到了两个重要的事实:

  • 消息引擎传输的对象是消息;

  • 如何传输消息属于消息引擎设计机制的一部分。

常见的消息引擎系统

ActiveMQ

官网:http://activemq.apache.org/

ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS 规范出台已经是很久的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。

RabbitMQ

官网:https://www.rabbitmq.com/

RabbitMQ 是流行的开源消息队列系统,用 erlang 语言开发。RabbitMQ 是 AMQP(高级消息队列协议)的标准实现。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX,持久化。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP 协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

Kafka

Apache Kafka 官网:http://kafka.apache.org/

Kafka 是 LinkedIn 开源的高吞吐量的分布式发布-订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据目前归属于 Apache 顶级项目。Apache Kafka 是 Kafka 系列版本中最出名的一个,其他 Kafka 还有 Confluent Kafka、Cloudera/Hortonworks Kafka、CDH/HDP Kafka 等。

RocketMQ

官网:http://rocketmq.apache.org/

RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。

Pulsar

官网:https://pulsar.apache.org/

Pulsar 是 pub-sub 模式的分布式消息平台,拥有灵活的消息模型和直观的客户端 API。Pulsar 由雅虎开发并于 2016 年开源的下一代消息系统,目前是 Apache 软件基金会的孵化器项目。

ZeroMQ

官网地址:https://zeromq.org/

ZeroMQ 号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZeroMQ 能够实现 RabbitMQ 不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这 MQ 能够应用成功的挑战。ZeroMQ 具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用 ZeroMQ 程序库,可以使用 NuGet 安装,然后你就可以愉快的在应用程序之间发送消息了。但是 ZeroMQ 仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter 的 Storm 0.9.0 以前的版本中默认使用 ZeroMQ 作为数据流的传输(Storm 从 0.9 版本开始同时支持 ZeroMQ 和 Netty 作为传输模块)。

常用消息引擎系统对比

Kafka vs. RabbitMQ vs. Pulsar
功能 RabbitMQ Kafka Pulsar 备注
存储功能 不支持 支持 支持
特点 富Broker, 傻消费者 傻Broker, 富消费者 傻Broker, 富消费者
数据私有-Exclusive 不支持 不支持 支持
数据共享-Shared 支持 支持 支持
故障切换-Failover 不支持 支持 支持
SubScription 不支持 不支持 支持
扩展 支持 支持 支持
高吞吐 支持 支持 支持
分布式 支持 支持 支持
数据回看 不支持 支持 支持
数据备份 不支持 支持 支持
数据路由 支持 不支持 不支持
生产者 支持 支持 支持
消费者 支持 支持 支持
Stream 不支持 支持 支持
RocketMQ vs. ActiveMQ vs. Kafka

数据来源:https://rocketmq.apache.org/docs/motivation/

消息传输格式

既然消息引擎是用于在不同系统之间传输消息的,如何设计待传输消息的格式是非常核心且重要的问题:

Kafka 的消息传输格式使用的是:结构化的纯二进制的字节序列。

消息传输协议

消息设计出来之后还不够,消息引擎系统还要设定具体的传输协议,即我用什么方法把消息传输出去。常见的有两种方法:

点对点模型

也叫消息队列模型。如果拿上面那个“民间版”的定义来说,那么系统 A 发送的消息只能被系统 B 接收,其他任何系统都不能读取 A 发送的消息。日常生活的例子比如电话客服就属于这种模型:同一个客户呼入电话只能被一位客服人员处理,第二个客服人员不能为该客户服务。

在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。

发布 / 订阅模型

与上面不同的是,它有一个主题(Topic)的概念,你可以理解成逻辑语义相近的消息容器。该模型也有发送方和接收方,只不过提法不同。发送方也称为发布者(Publisher),接收方称为订阅者(Subscriber)。和点对点模型不同的是,这个模型可能存在多个发布者向相同的主题发送消息,而订阅者也可能存在多个,它们都能接收到相同主题的消息。生活中的报纸订阅就是一种典型的发布 / 订阅模型。

在发布-订阅消息系统中,消息被持久化到一个 topic 中。与点对点消息系统不同的是,消费者可以订阅一个或多个 topic,消费者可以消费该 topic 中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。

总结:点对点模式中,生产者发送一条消息到queue,只有一个消费者能收到。发布/订阅模式中,发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。Kafka 同时上述这俩种消息传输模式(协议)。

使用消息系统的好处

1、解耦

在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

2、冗余

有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

3、扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。

4、灵活性 & 峰值处理能力

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

5、可恢复性

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

6、序保证

在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。

7、缓冲

在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。

8、异步通信

很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

使用消息引擎系统好处使用简单概括就是削峰填谷、异步解耦

认识 Kafka

Kafka简介

Kafka 在设计之初就旨在提供三个方面的特性:

  • 提供一套 API 实现生产者和消费者;
  • 降低网络传输和磁盘存储开销;
  • 实现高伸缩性架构。

开源之后的 Kafka 被越来越多的公司应用到它们企业内部的数据管道中,特别是在大数据工程领域,Kafka 在承接上下游、串联数据流管道方面发挥了重要的作用:所有的数据几乎都要从一个系统流入 Kafka 然后再流向下游的另一个系统中。

于是 Kafka 社区于0.10.0.0版本正式推出了流处理组件 Kafka Streams,也正是从这个版本开始,Kafka 正式“变身”为分布式的流处理平台,而不仅仅是消息引擎系统了。今天 Apache Kafka 是和 Apache Storm、Apache Spark 和 Apache Flink 同等级的实时流处理平台。

简言之,Kafka 是消息引擎系统(Messaging System),也是分布式流处理平台(Distributed Streaming Platform),而是能够实现精确一次(Exactly-once)处理语义的实时流处理平台。

作为流处理平台,Kafka 与其他主流大数据流式计算框架相比,优点如下:

  1. 更容易实现端到端的正确性(Correctness):要实现正确性和提供能够推导时间的工具。实现正确性是流处理能够匹敌批处理的基石。
  2. 可能助力 Kafka 胜出的第二点是它自己对于流式计算的定位。

Kafka 除用于消息引擎和流处理平台,而且还能够被用作分布式存储系统。Kafka 作者之一 Jay Kreps 曾经专门写过一篇文章阐述为什么能把 Kafka 用作分布式存储。不过这个功能建议只做了解就好了,目前还从没有谁在实际生产环境中,把 Kafka 当作持久化存储来用 。

术语

Kafka 属于分布式的消息引擎系统,它的主要功能是提供一套完备的消息发布与订阅解决方案。下图能够帮助你形象化地理解下文提到的术语概念:

broker

Kafka 集群包含一个或多个服务器,服务器节点称为:broker。

broker 存储 topic 的数据。如果某 topic 有 N 个 partition,集群有 N 个 broker,那么每个 broker 存储该 topic 的一个 partition。

如果某 topic 有 N 个partition,集群有(N + M)个broker,那么其中有 N 个 broker 存储该 topic 的一个 partition,剩下的 M 个 broker 不存储该 topic 的 partition 数据。

如果某 topic 有 N 个 partition,集群中 broker 数目少于 N 个,那么一个 broker 存储该 topic 的一个或多个 partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。

消息

消息:Record。Kafka 是消息引擎嘛,这里的消息就是指 Kafka 处理的主要对象。

对应图中的生产者发送到Broker的消息,绿色虚线表示消息发送方向。

主题

主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

分区

分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。

topic 中的数据分割为一个或多个 partition。每个 topic 至少有一个 partition。每个 partition 中的数据使用多个 segment 文件存储。partition 中的数据是有序的,不同 partition 间的数据丢失了数据的顺序。如果 topic 有多个 partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将 partition 数目设为 1。

消息位移

消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。

对应图中生产者发送到Broker的消息的位置。

副本

副本:Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。

对应图中的蓝色消息矩形框。

生产者

生产者:Producer。向主题发布新消息的应用程序。

生产者即数据的发布者,该角色将消息发布到 Kafka 的 topic 中。broker 接收到生产者发送的消息后,broker 将该消息追加到当前用于追加数据的 segment 文件中。生产者发送的消息,存储到一个 partition 中,生产者也可以指定数据存储的 partition。

消费者

消费者:Consumer。从主题订阅新消息的应用程序。

消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。

消费者位移

消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。

对应图中的消费者组中的消费者的绿色虚线指向的消息的位置。

消费者组

消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。

每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。

重平衡

重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

如果本章节的术语还是一知半解,建议阅读下文 Kafka 体系架构章节,希望帮助你我更好的理解 Kafka 的核心概念。

领导者

领导者:Leader。

每个 partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负责数据的读写的 partition。

追随者

追随者:Follower。

每个 partition 有多个副本,除了 Leader,剩余的都是 Follower。

Follower 跟随 Leader,所有写请求都通过 Leader 路由,数据变更会广播给所有 Follower,Follower 与 Leader 保持数据同步。如果 Leader 失效,则从 Follower 中选举出一个新的 Leader。当 Follower 与 Leader 挂掉、卡住或者同步太慢,leader 会把这个 follower 从in sync replicas(ISR)列表中删除,重新创建一个 Follower。

Kafka 的种类

架构师评估流处理平台的时候,框架本身的性能、所提供操作算子(Operator)的丰富程度固然是重要的评判指标,但框架与上下游交互的能力也是非常重要的。能够与之进行数据传输的外部系统越多,围绕它打造的生态圈就越牢固,因而也就有更多的人愿意去使用它,从而形成正向反馈,不断地促进该生态圈的发展。

就 Kafka 而言,Kafka Connect通过一个个具体的连接器(Connector),串联起上下游的外部系统。整个 Kafka 生态圈如下图所示。这张图中的外部系统只是 Kafka Connect 组件支持的一部分而已。目前还有一个可喜的趋势是使用 Kafka Connect 组件的用户越来越多,相信在未来会有越来越多的人开发自己的连接器。

目前市面上存在多个组织或公司发布不同的 Kafka。如同 Linux 系统一样,由于有不同的公司或组织开发出来 Linux 系统产品有不同的名称,比如我们熟知的 CentOS、RedHat、Ubuntu 等。

Apache Kafka

自 Kafka 开源伊始,它便在 Apache 基金会孵化并最终毕业成为顶级项目,它也被称为社区版 Kafka。 Apache Kafka 是后面其他所有 Kafka 产品的基础。也就是说,后面提到的发行版要么是原封不动地继承了 Apache Kafka,要么是在此之上扩展了新功能。

Confluent Kafka

Confluent 公司专注于提供基于 Kafka 的企业级流处理解决方案,该公司主要从事商业化 Kafka 工具开发,并在此基础上发布了 Confluent Kafka。Confluent Kafka 提供了一些 Apache Kafka 没有的高级特性,比如跨数据中心备份、Schema 注册中心以及集群监控工具等。

Confluent Kafka 是由 Confluent 公司出品,该公司主要从事商业化 Kafka 工具开发,并在此基础上发布了 Confluent Kafka。Confluent Kafka 提供了一些 Apache Kafka 没有的高级特性,比如跨数据中心备份、Schema 注册中心以及集群监控工具等。

Cloudera/Hortonworks Kafka

Cloudera 提供的 CDH 和 Hortonworks 提供的 HDP 是非常著名的大数据平台,里面集成了目前主流的大数据框架,能够帮助用户实现从分布式存储、集群调度、流处理到机器学习、实时数据库等全方位的数据处理。不管是 CDH 还是 HDP 里面都集成了 Apache Kafka,因此我把这两款产品中的 Kafka 称为 CDH Kafka 和 HDP Kafka。

产品比较

Apache Kafka
  • 优势:迭代速度快,社区响应度高,使用它可以让你有更高的把控度。

    它现在依然是开发人数最多、版本迭代速度最快的 Kafka。

  • 劣势:仅提供基础核心组件,缺失一些高级的特性。

    仅仅提供最最基础的组件,特别是对于前面提到的 Kafka Connect 而言,社区版 Kafka 只提供一种连接器,在实际使用过程中需要自行编写代码实现。另外没有提供任何监控框架或工具。

    好消息是目前有一些开源的监控框架可以帮助用于监控 Kafka(比如 Kafka manager)。

如果仅仅需要一个消息引擎系统亦或是简单的流处理应用场景,同时需要对系统有较大把控度,那么推荐使用 Apache Kafka。

Confluent Kafka
  • 优势:集成了很多高级特性且由 Kafka 原班人马打造,质量上有保证。

    Confluent Kafka 目前分为免费版和企业版两种。

    免费版:和 Apache Kafka 非常相像,除了常规的组件之外,还包含 Schema 注册中心和 REST proxy 两大功能。能帮助开发者集中管理 Kafka 消息格式以实现数据前向 / 后向兼容。免费版包含了更多的连接器,它们都是 Confluent 公司开发并认证过的。

    企业版:用开放 HTTP 接口的方式允许你通过网络访问 Kafka 的各种功能。拥有强大的跨数据中心备份和集群监控功能。

    多个数据中心之间数据的同步以及对集群的监控历来是 Kafka 的痛点,Confluent Kafka 企业版提供了强大的解决方案帮助开发者“干掉”它们。

  • 劣势:相关文档资料不全,普及率较低,没有太多可供参考的范例。

    Confluent 公司暂时没有发展国内业务的计划,相关的资料以及技术支持都很欠缺,很多国内 Confluent Kafka 使用者甚至无法找到对应的中文文档,因此目前 Confluent Kafka 在国内的普及率是比较低的。

一言以蔽之,如果需要用到 Kafka 的一些高级特性,那么推荐使用 Confluent Kafka。

CDH/HDP Kafka
  • 优势:大数据云公司提供的 Kafka,内嵌 Apache Kafka。操作简单,节省运维成本。

    通过便捷化且非常友好的界面操作将 Kafka 的安装、运维、管理、监控全部统一在控制台中,通常不需要进行任何配置就能有效地监控 Kafka。

  • 劣势:缺陷在于把控度低,演进速度较慢。

    由于监控封装得太好了,以至于使用者对下层的 Kafka 集群一无所知。并且发布周期和 Apache Kafka 更新周期不同步。

简单来说,如果需要快速地搭建消息引擎系统,或者需要搭建的是多框架构成的数据平台且 Kafka 只是其中一个组件,那么推荐使用这些大数据云公司提供的 Kafka。

Kafka 体系架构

如上图所示,一个典型的 Kafka 体系架构包括若干 Producer(可以是服务器日志,业务数据,页面前端产生的 page view 等等),若干 broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干 Consumer (Group),以及一个 Zookeeper 集群。

  • Kafka 通过 Zookeeper 管理集群配置,选举 leader,以及在 consumer group 发生变化时进行 rebalance。

  • Producer 使用 push(推)模式将消息发布到 broker。

  • Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。

名词解释

名称 解释
Broker 消息中间件处理节点,一个 Kafka 节点就是一个 broker,一个或者多个 Broker 可以组成一个 Kafka 集群
Topic Kafka 根据 topic 对消息进行归类,发布到 Kafka 集群的每条消息都需要指定一个 topic
Producer 消息生产者,向 Broker 发送消息的客户端
Consumer 消息消费者,从 Broker 读取消息的客户端
ConsumerGroup 每个 Consumer 属于一个特定的 Consumer Group,一条消息可以发送到多个不同的 Consumer
Partition 物理上的概念,一个 topic 可以分为多个 partition,每个 partition 内部是有序的

由上述概念可以了解到:

  • n 个 Broker 组成一个 Kafka 集群,通常一台机器部署一个 Kafka 实例,一个实例挂了其他实例仍可以使用,体现了Kafka 的高可用性。

  • 一个 partition 拥有多个 副本,并且这些副本在的其他 broker 上,体现强容灾能力。

  • partition 在机器磁盘上以 log 体现,采用顺序追加日志的方式添加新消息,实现高吞吐量。

Topic(主题) & Partition(分区)

Kafka 中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到 Kafka 集群中的每一条消息都要指定一个主题),而消息者负责订阅主题并进行消费。

主题是逻辑上的概念,主题的物理层面就是分区,一个分区只属于单个主题,很多时候把分区称为主题分区(Topic-Partition)。

一个 topic 可以认为一个一类消息,每个 topic 将被分成多个 partition,每个 partition 在存储层面是 append log文 件。任何发布到此 partition 的消息都会被追加到 log 文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型的数字,它是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。

每条消息都被 append 到 partition 中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是 Kafka 高吞吐率的一个很重要的保证)。

每一条消息被发送到 broker 中,会根据 partition 规则选择被存储到哪一个 partition。如果 partition 规则设置的合理,所有消息可以均匀分布到不同的 partition 里,这样就实现了水平扩展。(如果一个 topic 对应一个文件,那这个文件所在的机器 I/O 将会成为这个 topic 的性能瓶颈,而 partition 解决了这个问题)。在创建 topic 时可以在$KAFKA_HOME/config/server.properties中指定这个 partition 的数量(如下所示),当然可以在 topic 创建之后去修改 partition 的数量。

1
2
3
4
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=4

默认值是:1,在生产环境注意一定要设置合理值。

简言之,为提升 Kafka 接收消息的吞量,将消息的主题设置成了分区的物理概念,这些分区都属于同一个主题,对外接收该主题的消息。

在发送一条消息时,可以指定这个消息的 key,producer 根据这个 key 和 partition 机制来判断这个消息发送到哪个 partition。partition 机制可以通过指定 producer 的partition.class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。

上面动态图表示的是将产生相同主题的消息轮询发送到不同的该主题的主题分区中。

Replication(分区的副本)

Kafka 的高可靠性的保障来源于其健壮的副本(replication)策略。通过调节其副本相关参数,可以使得 Kafka 在性能和可靠性之间运转的游刃有余。Kafka 从 0.8.x 版本开始提供 partition 级别的复制,replication 的数量可以在$KAFKA_HOME/config/server.properties中配置(default.replication.refactor)。

为了提高消息的可靠性,Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中一个 broker 失效情况下仍然保证服务可用。在 Kafka 中发生复制时确保 partition 的日志能有序地写到其他节点上,N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 处理 partition 的所有读写请求,与此同时,follower 会被动定期地去复制 leader上的数据。

如下图所示,Kafka 集群中有 4 个 broker,某 topic 有 3 个 partition,且复制因子即副本个数也为 3:

Kafka 提供了数据复制算法保证,如果 leader 发生故障或挂掉,一个新 leader 被选举并被接受客户端的消息成功写入。Kafka 确保从同步副本列表中选举一个副本为 leader,或者说 follower 追赶 leader 数据。leader 负责维护和跟踪ISR(In-Sync Replicas的缩写,表示副本同步队列)中所有 follower 滞后的状态。当 producer 发送一条消息到 broker 后,leader 写入消息并复制到所有 follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的 follower 限制,重要的是快速检测慢副本,如果follower “落后”太多或者失效,leader 将会把它从 ISR 中删除。

默认情况下 Kafka 的 replica 数量为 1,即每个 partition 都有一个唯一的 leader,为了确保消息的可靠性,通常应用中将其值(由 broker 的参数offsets.topic.replication.factor指定)大小设置为大于 1,比如 3。

ISR

ISR、OSR、AR

ISR(In-Sync Replicas)是指副本同步队列。副本数对 Kafka 的吞吐率是有一定的影响,但极大的增强了可用性。

  • 分区中的所有副本(replicas)统称为Assigned Replicas,即AR
  • 所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas),因此 ISR 是 AR 中的一个子集。
  • leader 副本负责维护和跟踪 ISR 集合中所有 follower 副本的滞后状态,当 follower 副本落后太多或者失效时,leader 副本会把 follower 剔除出ISR
  • 从 ISR 列表中被剔除的 follower 副本会存入OSR(Outof-Sync Replicas)列表,新加入的 follower 也会先存放在OSR中。
  • 如果 OSR 集合中有 follower 副本“追上”了 leader 副本,那么 leader 副本会把它从 OSR 集合中转移到 ISR 集合中。
  • 当 leader 副本发生故障时,只有在 ISR 集合中的副本才有资格被选举为新的 leader,而在 OSR 集合中的副本则没有任何机会。默认会从 ISR 集合中选择第一个 follower 副本当新的 leader 副本。
  • AR = ISR + OSR,正常情况下,所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR = ISR,OSR 集合为空。

follower 进入 ISR 列表条件

能够进入 ISR 列表中的条件是可以进行参数配置的:

replica.lag.time.max.ms 默认值:10000,单位为:毫秒

该配置表示如果一个 follower 在有一个时间窗口内(默认值为 10 秒)没有发送任意 fetch 请求,leader 就会把这个 follower 从 ISR(in-sync replicas)移除,并存入 OSR 集合。

Kafka 0.9.0.0版本后移除了replica.lag.max.messages参数,只保留了replica.lag.time.max.ms作为 ISR 中副本管理的参数。官方文档说明参见:http://kafka.apache.org/090/documentation.html#upgrade_9_breaking

Kafka 0.9.0.0版本后移除了 replica.lag.max.messages 参数,只保留了 replica.lag.time.max.ms 作为 ISR 中副本管理的参数。为什么这样做呢?

replica.lag.max.messages 表示当前某个副本落后 leader 的消息数量超过了这个参数的值,那么 leader 就会把 follower 从 ISR 中删除。

假设设置 replica.lag.max.messages = 4,那么如果 producer 一次传送至 broker 的消息数量都小于 4 条时,因为在 leader 接受到 producer 发送的消息之后而 follower 副本开始拉取这些消息之前,follower 落后 leader 的消息数不会超过 4 条消息,故此没有 follower 移出 ISR,所以这时候 replica.lag.max.message 的设置似乎是合理的。

但是 producer 发起瞬时高峰流量,producer 一次发送的消息超过 4 条时,也就是超过 replica.lag.max.messages,此时 follower 都会被认为是与 leader 副本不同步了,从而被踢出了 ISR。

但实际上这些 follower 都是存活状态的且没有性能问题。那么在之后追上 leader,并被重新加入了 ISR。于是就会出现它们不断地剔出 ISR 然后重新回归 ISR,这无疑增加了无谓的性能损耗。

而且这个参数是 broker 全局的。设置太大了,影响真正“落后” follower 的移除;设置的太小了,导致 follower 的频繁进出。无法给定一个合适的 replica.lag.max.messages 的值,故此,新版本的 Kafka 移除了这个参数。

值得注意的是:ISR 列表中包括:leader 和 follower。

Unclean 领导者选举(Unclean Leader Election)

当 leader 的所有副本都被剔除到了 OSR 列表中,此时 ISR 列表中只剩下 leader 了,此时 leader 自己“挂了”,那么 ISR 集合为空。此时该怎么选举 leader 呢?

Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。

Broker 端参数unclean.leader.election.enable控制是否允许 Unclean 领导者选举。

丢失数据风险

场景:

1、follower2 不在 ISR 中,但正在努力“赶上”。

2、ISR 中所有副本意外“挂掉”。

3、开启了 Unclean 领导者选举,所以 follower2 成了新的 leader。

4、原来在 ISR 里的副本都成了新 leader 的 follower,需要重新加入 ISR 中,“超长”部分被截取了,导致数据不一致。

如上图所示,新的 follower 副本需要删除消息 4 和消息 5,之后才能与新的 leader 副本进行同步。之后新的 follower 副本和新的 leader 副本组成了新的 ISR 集合,参考下图。

原本客户端已经成功的写入了消息 4 和消息 5,而在发生日志截断之后就意味着这 2 条消息就丢失了,并且新的 follower 副本和新的 leader 副本之间的消息也不一致。

开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。

从 Kafka 0.11.0.0 版本开始unclean.leader.election.enable参数的默认值由原来的true改为false

原文参见:

http://kafka.apache.org/documentation/#upgrade_1100_notable

Unclean leader election is now disabled by default. The new default favors durability over availability. Users who wish to to retain the previous behavior should set the broker config unclean.leader.election.enable to true.

读者可以根据你的实际业务场景决定是否开启 Unclean 领导者选举。但是笔者强烈建议不要开启它,毕竟我们还可以通过其他的方式来提升高可用性。

Kafka文件存储机制

Kafka 中消息是以 topic 进行分类的,生产者通过 topic 向 Kafka broker 发送消息,消费者通过 topic 读取数据。

然而 topic 在物理层面又能以 partition 为分组,一个 topic 可以分成若干个 partition,那么 topic 以及 partition 又是怎么存储的呢?partition 还可以细分为 segment,一个 partition 物理上由多个 segment 组成,那么这些 segment 又是什么呢?

Parition

为了便于说明问题,假设这里只有一个 Kafka 集群,且这个集群只有一个 Kafka broker,即只有一台物理机。在这个 Kafka broker 中配置(默认配置):

1
2
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

以此来设置 Kafka 消息文件存储目录,与此同时创建一个topic:my-topic,partition 的数量为 3:

1
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic

那么我们此时可以在/tmp/kafka-logs目录中可以看到生成了 3 个目录:

1
2
3
drwxr-xr-x 2 root root 4096 Apr 10 16:10 my-topic-0
drwxr-xr-x 2 root root 4096 Apr 10 16:10 my-topic-1
drwxr-xr-x 2 root root 4096 Apr 10 16:10 my-topic-2

在 Kafka 文件存储中,同一个 topic 下有多个不同的 partition,每个 partiton 为一个目录,partition 的名称规则为:

1
topic名称 + 有序序号

第一个序号从 0 开始计,最大的序号为 partition 数量减 1,partition 是实际物理上的概念,而 topic 是逻辑上的概念。

Segment

如果就以 partition 为最小存储单位,我们可以想象当 Kafka producer 不断发送消息,必然会引起 partition 文件的无限扩张,这样对于消息文件的维护以及已经被消费的消息的清理带来严重的影响,所以这里以 segment 为单位又将 partition 细分。

  • 每个 partition(目录)相当于一个巨型文件被平均分配到多个大小相等的 segment(段)数据文件中(每个 segment 文件中消息数量不一定相等)这种特性也方便 old segment 的删除,即方便已被消费的消息的清理,提高磁盘的利用率。
  • 每个 partition 只需要支持顺序读写就行,segment 的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours} 等若干参数)决定。
文件组成

segment 文件由两部分组成,分别为.index文件和.log文件,分别表示为 segment 索引文件和数据文件。这两个文件的命令规则为:

文件名规则

partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset值,数值大小为 64 位(long 大小),20 位数字字符长度,没有数字用 0 填充。

实验:创建一个主题仅包含 1 个分区 ,设置每个 segment 大小为 500MB,并启动 producer 向 Kafka broker 写入大量数据,如下图所示 segment 文件列表形象说明了上述两个规则:

以上述图中一对segment file文件为例,说明segment.index.log 文件的对应关系物理结构如下:

上述图中索引文件存储大量元数据,数据文件存储大量消息,索引文件(.index)中元数据指向对应数据文件中 message 的物理偏移地址

其中以索引文件中元数据[3, 497]为例,依次在数据文件中表示第 3 个 message(在全局 partiton 表示第 368772 个 message)、以及该消息的物理偏移地址为 497。

如何从 partition 中通过 offset 查找 message 呢?

以上图为例,读取 offset = 368777 的消息。

  • 第一步:查找 segment 文件

    其中 00000000000000000000.index 为最开始的文件,第二个文件为 00000000000000368769.index(起始偏移为:368769 + 1 = 368770),而第三个文件为00000000000000737337.index(起始偏移为 737337 + 1 = 737338),所以这个 offset = 368777 就落到了第二个文件之中。

    其他后续文件可以依次类推,以其实偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。

    其次根据 00000000000000368769.index 文件中的 [8,1325] 定位到 00000000000000368769.log 文件中的1325的位置进行读取。

  • 第二步:通过 segment file 查找 message。

    通过第一步定位到 segment file,当 offset=368776 时,依次定位到00000000000000368769.index 的元数据([8,1686])物理位置(1686)和 00000000000000368769.log 的物理偏移地址(1686),然后再通过 00000000000000368769.log 顺序查找直到 offset=368776 为止。

要是读取 offset=368777 的消息,从 00000000000000368769.log 文件中的 1325 的位置进行读取,那么怎么知道何时读完本条消息,否则就读到下一条消息的内容了?

这个就需要联系到消息的物理结构了,消息都具有固定的物理结构,包括:

offset(8 Bytes)

消息体的大小(4 Bytes)

crc32(4 Bytes)

magic(1 Byte)

attributes(1 Byte)

key length(4 Bytes)

key(K Bytes)

payload(N Bytes)

等等字段,可以确定一条消息的大小,即读取到哪里截止。

HW & LEO

LEO(Log End Offset)

LEO 是日志末端位移 Log End Offset 的缩写,它标识当前日志文件中下一条待写入消息的 offset。LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加 1。分区 ISR 集合中的每个副本都会维护自身的 LEO。

HW(High Watermark)

HW 俗称高水位,High Watermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,它标识了一个特定的消息偏移量(offset)。

consumer 最多只能消费到 HW 所在的位置之前的消息。另外每个 replica 都有HW,leader 和 follower 各自负责更新自己的 HW 的状态。更精确表述为:consumer无法消费分区下leader副本中位移值大于分区 HW的任何消息。这里需要特别注意分区 HW 就是 leader 副本的 HW 值

对于 leader 新写入的消息,consumer 不能立刻消费,leader 会等待该消息被所有 ISR 中的 replicas 同步后更新 HW,此时消息才能被 consumer 消费。这样就保证了如果 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取。对于来自内部 broker 的读取请求,没有 HW 的限制。

对于 HW 机制有存在丢失数据的风险,因此在 Kafka 0.11 引入了leader epoch来取代 HW 值。感兴趣的读者可以移步至:Kafka水位(high watermark)与leader epoch的讨论,该文章细致了讲述了 leader 副本和 follower 副本之前的 HW 及 LEO 的更新触发时机。

数据可靠性

生产端

当 producer 向 leader 发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:

  • 1(默认):这意味着 producer 在 ISR 中的 leader 已成功收到数据并得到确认。如果 leader 宕机了,则会丢失数据。
  • 0:这意味着 producer 无需等待来自 broker 的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
  • -1:producer 需要等待 ISR中 的所有 follower 都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当 ISR 中只有 leader 时(前面 ISR 那一节讲到,ISR 中的成员由于某些情况会增加也会减少,最少就只剩一个leader),这样就变成了acks=1的情况。

因此,如果要提高数据的可靠性,在设置request.required.acks=-1的同时,也要min.insync.replicas这个参数(可以在 broker 或者 topic 层面进行设置))的配合,这样才能发挥最大的功效。

min.insync.replicas这个参数设定 ISR 中的最小副本数是多少,默认值为 1。

当且仅当request.required.acks参数设置为-1时,此参数才生效。

如果 ISR 中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常:

1
org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required
request.required.acks=1

producer 发送数据到 leader,leader 写本地日志成功,返回客户端成功;此时 ISR 中的副本还没有来得及拉取该消息,leader 就宕机了,那么此次发送的消息就会丢失。

request.required.acks=-1

同步的发送模式(Kafka默认为同步,即producer.type=sync),replication.factor>=2min.insync.replicas>=2的情况下,不会丢失数据。

有两种典型情况。acks=-1的情况下(如无特殊说明,以下 acks 都表示为参数request.required.acks),数据发送到 leader,ISR 的 follower 全部完成数据同步后,leader 此时挂掉,那么会选举出新的 leader,数据不会丢失。

acks=-1 的情况下,数据发送到 leader 后 ,部分 ISR 的副本同步,leader 此时挂掉。比如 follower1 和 follower2 都有可能变成新的 leader,producer 端会得到返回异常,producer 端会重新发送数据,数据可能会重复。

当然上图中如果在 leader crash 的时候,follower2 还没有同步到任何数据,而且 follower2 被选举为新的 leader 的话,这样消息就不会重复。

综上:

producer 在同步的发送模式下(kafka默认),replication.factor>=2min.insync.replicas>=2的情况下,不会丢失数据。

producer 在异步的发送模式下,设置request.required.acks=-1参数,可以保证不丢失数据,但可能存在发送重复数据风险。

消费端

对于消息的可靠性,很多人都会忽视消费端的重要性,如果一条消息成功地写入 Kafka,并且也被 Kafka 完好的保存,而在消费时,由于某些疏忽造成没有消费到这条消息,那么对于应用来说,这条消息也是丢失的。

enable.auto.commit参数的默认值是 true,及开启自动位移提交的功能。对于高可靠性要求的应用来说,这种默认提交位移方式是不可取的,因此需要将enable.auto.commit参数设置为 false 来执行手动位移提交。

手动提交位移建议遵循一个原则:如果消息没有成功被消费,那么就不能提交对应的消费位移。对高可靠要求的应用来说,宁愿重复消费也不应该因为消费异常而导致消息丢失。

有时候,由于应用解析消息的异常,可能导致部分消息一直不能成功被消费,那么这时候为了不影响整体消费的进度,可以将这类消息暂存到死信队列中,以便后续故障排查。

死信队列:由于某些原因消息无法被正确地投递,为了确保消息不会被无故地丢弃,一般将其置于一个特殊角色的队列,这个队列一般称为死信队列。

Java API - 生产端

KafkaProducer 实例

apache 提供了工具类,方便开发者设置配置参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

public static Properties initConfig() {
Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
return props;
}

public static void testKafkaProducer() {
Properties properties = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
}

KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将 KafkaProducer 实例进行池化来供其他线程调用。一般选用public KafkaProducer(Properties properties)这个构造方法来创建 KafkaProducer 实例。

ProducerRecord 对象

ProducerRecord 的构造方法有很多种:

1
2
3
4
5
6
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, K key, V value)
public ProducerRecord(String topic, K key, V value)
public ProducerRecord(String topic, V value)

第一种构造函数最丰富,开发者可以自定义很多消息对象附属信息。最后一种最简单,只需要 topic 和 value 即可。

消息发送的方式

KafkaProducer 的 send() 方法并非是 void 类型,而是Future<RecordMetadata> 类型,send() 方法有 2 个重载方法,具体定义如下:

1
2
3
4
5
6
// Asynchronously send a record to a topic,异步发送消息
Future<RecordMetadata> send(ProducerRecord<K, V> record);

// Asynchronously send a record to a topic,异步发送消息带回调
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

发后即忘(fire and forget)

这种模式只管往 Kafka 发送消息而不关系消息是否正确到达,这种性能最高,但是可靠性最差,因为一旦发生不可重试异常的时候,就会造成消息丢失:

1
producer.send(record);
同步发送(sync)

利用返回的 Future 对象实现:

1
2
3
4
5
try {
producer.send(record).get();
} catch (ExecutionException | InterruptedException exception) {
exception.printStackTrace();
}

send() 方法本身是异步的,该方法返回的 Future 对象可以使调用方稍后获得发送的结果。上述代码就是直接链式调用了 get() 方法来阻塞等嗲 kafka 的响应,直到消息发送成功或者发生异常。

也可以不直接调用 get() 方法,比如下面这种也是同步发送方式:

1
2
3
4
5
6
7
8
9
10
try {
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();

String topic = metadata.topic();
long offset = metadata.offset();

} catch (ExecutionException | InterruptedException exception) {
exception.printStackTrace();
}

这样可以获取一个 RecordMetadata 对象,在 RecordMetadata 对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。

另外,sned() 方法的返回类型是 Future 类型,开发者可以使用 Java 语言层面的技巧来丰富应用的实现,比如使用 Future 中的 get(long timeout, TimeUnit unit) 方法实现可超时的阻塞。

同步发送不会像“发后即忘”的方式直接造成消息的丢失,但是这种方式的性能会很差,需要阻塞等待本条消息发送完之后才能发送下一条。

异步发送(async)

在开发中最常用的方式就是使用 send() 方法中带回调信息的异步发送:即指定一个 Callback 的回调函数,kafka 在返回响应时调用该函数来实现异步的发送确认:

1
2
3
4
5
6
7
8
9
10
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null) {
exception.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});

注意上述代码中,onCompletion() 方法的俩个参数是互斥的:

  • 消息发送成功时,metadata 不为 null 而 exception 为 null
  • 消息发送失败时,metadata 为 null 而 exception 不为 null

对于同一个分区来说,如果消息 record1 于 record2 之前发送,那么 KafkaProducer 可以保证对应的 callback1 在 callback2 之前调用,也就是说,回调函数的调用可以保证同一个分区内有序。

1
2
producer.send(record1, callback1);
producer.send(record2, callback2);

关闭资源

KafkaProducer 发送完一批消息之后,注意及时关闭资源,KafkaProducer 提供了俩个 close() 方法,第二个是带超时时间的关闭资源方法:

1
2
void close();
void close(long timeout, TimeUnit timeUnit);

发送异常

KafkaProducer 一般会发生两种类型的异常:可重试异常和不可重试异常。

可重试异常

常见有:NetWorkException、 LeaderNotAvailableException、UnknownTopicOrPartitionException、UnknownTopicOrPartitionException、NotCoordinatorException 等。下图展示了 kafka 可以重试的异常,只有超过重试阀值的时候才会给开发者抛出异常。

可重试异常的配套参数为:retries,只要在规定的重试次数内自行恢复的,就会抛出异常。retries 参数默认值为 0,配置参考:

1
props.put(ProducerConfig.RETRIES_CONFIG, 10);

Java API - 消费端

poll() 方法

Kafka 中的消费是基于拉模式的。代码形如:

1
2
3
4
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

}

上述代码可以看出,Kafka 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll() 方法,而 poll() 方法返回的是所订阅的主题(分区)上的一组消息。

对于 poll() 方法而言:

  • 如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空。

  • 如果订阅的所有分区中都没有可供消费的消息,那么 poll() 方法返回为空的消息集合。

poll() 方法的具体定义如下:

1
public ConsumerRecords<K, V> poll(final Duration timeout);

timeout 参数表示超时时间参数,用来控制 poll() 方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发成阻塞。

同步提交

消费端对于自动提交存在丢失数据的风险,本章节不再举代码示例。重点关注手动提交方式。

手动提交可以细分为同步提交和异步提交,对应 kafkaConsumer 中的commitSync()commitAsync()两种类型的方法。

1
public void commitSync();

同步提交的简单用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.client.id.demo");

//设置手动提交消息偏移
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// do some logical processing
}
consumer.commitSync();
}
} catch (Exception exception) {
Log.error("occur exception", exception);
} finally {
consumer.close();
}

上述代码示例中,拉取到的消息是批量的,消费过程是按顺序一条条消息做相应的逻辑处理,然后对整个消息集做同步提交。等待全部消费完毕,再手动提交本批次的偏移量。针对上面的示例可以修改为批量处理 + 批量提交的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}

if (buffer.size() >= minBatchSize) {
// do some logical processing with buffer
consumer.commitSync();
buffer.clear();
}
}
} catch (Exception exception) {
Log.error("occur exception", exception);
} finally {
consumer.close();
}

上述代码示例中,将拉取到的消息缓存到 buffer,等积累到足够多的时候,也就是示例中大于等于 200 个消息时候,再做相应的处理,最后再做批量提交。

以上两个同步提交示例都存在重复消费的问题,如果程序在消费完数据,但还没来得及提交位移的时候出现了崩溃,那么待恢复之后只能从上一次位移提交的地方拉取消息,由此在俩次位移提交的窗口中出现了重复消费的现象。

consumer.commitSync() 方法会根据 poll() 方法拉取最新位移来进行提交,只要没有发生不可恢复的错误,它就会阻塞消费者线程直至位移提交完成。对于不可恢复的错误,比如 CommitFailedException、WakeupException 等,我们可以将其捕获并做针对性处理。

对于 commitSync() 无参方法,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果需要更细粒度的提交,就需要使用带参数的 commitSync() 方法:

1
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets);

该方法提供了一个 offset 参数,用来提交指定分区的位移。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// do some logical processing
long offset = record.offset();
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1)));
}
}
} catch (Exception exception) {
log.error("occur exception", exception);
} finally {
consumer.close();
}
按照分区消费并提交位移

实际应用很少会每消费一条消息就提交一次消费位移。因为同步提交本身就很性能,加上一条条阻塞式提交,性能会更低。更多时候是根据分区的粒度划分提交位移的界限。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

for (TopicPartition partition : records.partitions()) {

List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);

for (ConsumerRecord<String, String> record : records) {
// do some logical processing
}

long lastConsumerOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastConsumerOffset + 1)));
}
}
} catch (Exception exception) {
log.error("occur exception", exception);
} finally {
consumer.close();
}

异步提交

异步提交使用 commitAsync() 方法,该方法在执行的时候消费者线程不会被阻塞,该方法有 3 个重载方法:

1
2
3
void commitAsync()
void commitAsync(OffsetCommitCallback callback)
void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

第二个和第三个方法里的回调函数和生产端发送回调函数类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// do some logical processing
}

consumer.commitAsync(new OffsetCommitCallback() {

@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if(exception == null) {
System.out.println(offsets) ;
} else {
log.error("occur exception", exception);
}

}
});
}

注意:异步提交消费位移发生异常的时候,使用回调函数捕获提交异常,如果做重试提交,那么会可能产生重复消费问题。比如:第一个异步提交位移在某一时刻失败了,此时第二个异步提交位移提交了新的位移,由于有重试的机制,那么第一个位移(相比第二个位移是旧的)提交之后,第三次消费的时候,又是从第一次消费处开始消费。

参考资料

消息队列技术介绍

Kafka、RabbitMQ、RocketMQ消息中间件的对比 —— 消息发送性能

Java消息中间件

10分钟搞懂:95%的程序员都拎不清的分布式消息队列中间件

消息中间件(一)MQ详解及四大MQ比较

Kafka各个版本差异汇总

Kafka设计解析(一)- Kafka背景及架构介绍

Kafka的时代已经过去了,未来是Pulsar的吗?

Thorough Introduction to Apache Kafka

Developing a Deeper Understanding of Apache Kafka Architecture

Using Apache Kafka as a Scalable, Event-Driven Backbone for Service Architectures

Apache Kafka消息格式的演变(0.7.x~0.10.x)

Kafka消息时间戳及压缩消息对时间戳的处理

kafka数据可靠性深度解读

图解Apache Kafka消息偏移量的演变(0.7.x~0.10.x)

Kafka学习之路 (一)Kafka的简介

Kafka学习之路 (二)Kafka的架构

Kafka学习之路 (三)Kafka的高可用

Kafka 数据可靠性深度解读

kafka高可用原理

kafka 高可用

Kafka水位(high watermark)与leader epoch的讨论

Kafka文件存储机制那些事

Kafka参数图鉴——unclean.leader.election.enable

updated updated 2024-01-01 2024-01-01
本文结束感谢阅读

本文标题:Apache Kafka 学习笔记

本文作者:woodwhales

原始链接:https://woodwhales.cn/2019/12/01/055/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

0%