kafka架构之Producer、Consumer详解

Producer负载均衡

生产者将数据直接发送到作为分区领导者的broker,而没有任何干预路由层。 为了帮助生产者做到这一点,所有 Kafka 节点都可以在任何给定时间回答有关哪些服务器处于活动状态以及主题分区的领导者在哪里的元数据请求,以允许生产者适当地引导其请求。

客户端控制将消息发布到哪个分区。 这可以随机完成,实现一种随机负载平衡,或者可以通过一些语义分区函数来完成。 我们通过允许用户指定一个键来进行分区并使用它来散列到一个分区(如果需要,还有一个选项可以覆盖分区功能),我们公开了语义分区的接口。 例如,如果选择的键是用户 ID,那么给定用户的所有数据都将发送到同一个分区。 这反过来将允许消费者对他们的消费做出局部性假设。 这种分区风格被明确设计为允许在消费者中进行局部敏感处理。

Producer异步发送

批处理是效率的重要驱动因素之一,为了启用批处理,Kafka 生产者将尝试在内存中积累数据并在单个请求中发送更大的批次。 批处理可以配置为累积不超过固定数量的消息,等待时间不超过某个固定的延迟限制(例如 64k 或 10 毫秒)。 这允许累积更多要发送的字节,并且服务器上很少有较大的 I/O 操作。 这种缓冲是可配置的,并提供了一种机制来权衡少量额外的延迟以获得更好的吞吐量。

Consumer

Kafka 消费者的工作方式是向它想要消费的分区的broker发出“获取”请求。 消费者在每个请求的日志中指定其偏移量,并从该位置开始接收一个日志块。 因此,消费者对该位置具有显着的控制权,并且可以在需要时将其倒回以重新消费数据。

关于Producer Pull和Push的讨论

我们最初考虑的一个问题是消费者应该从broker那里Pull(提取)数据还是broker应该将数据Push(推送)给消费者。在这方面,Kafka 遵循更传统的设计,被大多数消息传递系统共享,其中数据从生产者推送到broker,并由消费者从broker拉取。一些以日志为中心的系统,例如 Scribe 和 Apache Flume,遵循非常不同的基于推送的路径,将数据推送到下游。这两种方法各有利弊。然而,基于推送的系统难以处理不同的消费者,因为broker控制数据传输的速率。目标通常是让消费者能够以最大可能的速率消费;不幸的是,在推送系统中,这意味着当消费率低于生产率时,消费者往往会不知所措(本质上是拒绝服务)。基于拉动的系统具有更好的特性,即消费者只是落后并在可能的时候赶上。这可以通过某种退避协议来缓解,消费者可以通过该协议表明它已经不堪重负,但是让传输速率充分利用(但永远不会过度利用)消费者比看起来更棘手。以前以这种方式构建系统的尝试使我们采用了更传统的拉式模型。

基于拉式系统的另一个优点是它有助于对发送给消费者的数据进行积极的批处理。 基于推送的系统必须选择要么立即发送请求,要么积累更多数据,然后在不知道下游消费者是否能够立即处理它的情况下发送。 如果调整为低延迟,这将导致一次发送一条消息,但传输最终会被缓冲,这是一种浪费。 基于拉取的设计解决了这个问题,因为消费者总是在其在日志中的当前位置之后(或达到某个可配置的最大大小)拉取所有可用消息。 因此,可以在不引入不必要的延迟的情况下获得最佳批处理。

朴素的基于拉取的系统的不足之处在于,如果broker没有数据,消费者最终可能会在一个紧密的循环中轮询,有效地忙于等待数据到达。 为了避免这种情况,我们在拉取请求中设置了参数,允许消费者请求在“长轮询”中阻塞,等待数据到达(并且可以选择等待给定数量的字节可用以确保大传输大小)。

您可以想象其他可能的设计,它们只是端到端的拉动。 生产者会在本地写入本地日志,broker会从中提取,而消费者会从brokers中提取。 通常建议使用类似类型的“存储转发”生产者。 这很有趣,但我们觉得不太适合我们拥有数千个生产者的目标用例。 我们大规模运行持久性数据系统的经验让我们觉得,在系统中跨许多应用程序使用数千个磁盘实际上不会使事情变得更可靠,而且操作起来会是一场噩梦。 在实践中,我们发现我们可以大规模运行具有强大 SLA 的管道,而无需生产者持久化。

Producer消费位置

跟踪已消费的内容是消息传递系统的关键性能点之一。

大多数消息传递系统都保留有关broker上已使用哪些消息的元数据。 也就是说,当消息传递给消费者时,broker要么立即在本地记录该事实,要么等待消费者的确认。 这是一个相当直观的选择,实际上对于单机服务器来说,这种状态还能去哪里还不清楚。 由于许多消息传递系统中用于存储的数据结构的伸缩性很差,这也是一个实用的选择——因为broker知道消耗了什么,它可以立即删除它,从而保持较小的数据大小。

让broker和消费者就已经消费的内容达成一致并不是一个小问题。如果broker在每次通过网络分发消息时立即将其记录为已消费,那么如果消费者未能处理该消息(例如因为它崩溃或请求超时或其他原因),该消息将丢失。为了解决这个问题,很多消息系统都添加了确认功能,这意味着消息在发送时只标记为已发送而不是被消费;broker等待来自消费者的特定确认以将消息记录为已消费。这种策略解决了丢失消息的问题,但会产生新的问题。首先,如果消费者处理消息但在发送确认之前失败,则消息将被消费两次。第二个问题是关于性能的,现在broker必须保持每条消息的多个状态(首先锁定它以免第二次发出,然后将其标记为永久消耗以便可以删除)。必须处理棘手的问题,例如如何处理已发送但从未确认的消息。

Kafka 对此有不同的处理方式。 我们的主题分为一组完全有序的分区,每个分区在任何给定时间由每个订阅消费者组中的一个消费者消费。 这意味着消费者在每个分区中的位置只是一个整数,即要消费的下一条消息的偏移量。 这使得关于已消费内容的状态非常小,每个分区只有一个数字。 可以定期检查此状态。 这使得等效的消息确认非常便宜。

这个决定有一个附带好处。 消费者可以故意倒回到旧的偏移量并重新消费数据。 这违反了队列的共同契约,但结果证明是许多消费者的基本特征。 例如,如果消费者代码有一个 bug,并且在消费了一些消息后被发现,那么一旦 bug 被修复,消费者就可以重新消费这些消息。

离线数据加载

可扩展的持久性允许消费者仅定期消费,例如批量数据加载,定期将数据批量加载到离线系统(如 Hadoop 或关系数据仓库)中。

在 Hadoop 的情况下,我们通过将负载拆分为单个映射任务来并行化数据加载,每个节点/主题/分区组合一个,允许加载完全并行。 Hadoop 提供了任务管理,失败的任务可以重新启动,而没有重复数据的危险——它们只需从原始位置重新启动。

static membership

静态成员资格旨在提高基于组重新平衡协议构建的流应用程序、消费者组和其他应用程序的可用性。 重新平衡协议依赖组协调器将实体 ID 分配给组成员。 这些生成的 ID 是短暂的,会在成员重新启动和重新加入时发生变化。 对于基于消费者的应用程序,在代码部署、配置更新和定期重启等管理操作期间,这种“动态成员资格”会导致很大一部分任务重新分配给不同的实例。 对于大型状态应用程序,shuffled 任务需要很长时间才能在处理之前恢复其本地状态,并导致应用程序部分或全部不可用。 受此观察启发,Kafka 的组管理协议允许组成员提供持久的实体 ID。 组成员身份基于这些 id 保持不变,因此不会触发重新平衡。

如果要使用静态成员资格,

将broker集群和客户端应用程序升级到 2.3 或更高版本,并确保升级后的broker也使用 2.3 或更高版本的 inter.broker.protocol.version。 将配置 ConsumerConfig#GROUP_INSTANCE_ID_CONFIG 设置为一个组下每个使用者实例的唯一值。 对于 Kafka Streams 应用程序,为每个 KafkaStreams 实例设置唯一的 ConsumerConfig#GROUP_INSTANCE_ID_CONFIG 就足够了,与实例使用的线程数无关。

如果您的broker版本低于 2.3,但您选择在客户端设置 ConsumerConfig#GROUP_INSTANCE_ID_CONFIG,应用程序将检测代理版本,然后抛出 UnsupportedException。 如果您不小心为不同的实例配置了重复的 id,代理端的防护机制将通过触发 org.apache.kafka.common.errors.FencedInstanceIdException 通知您的重复客户端立即关闭。 有关更多详细信息,请参阅 KIP-345