Contents

Kafka(Go)教程(十一)---Consumer Group & Rebalance

本文主要讲述了 Kafka 的消费者组(Consumer Group)和 消费者组的 Rebalance 及如何避免无效 Rebalance。

Kakfa 相关代码见 Github

1. 传统消息模型

传统消息模型一般分为消息队列模型和发布订阅模型:

  • 1)消息队列模型的缺陷在于消息一旦被消费,就会从队列中被删除,而且只能被下游的一个 Consumer 消费。这种模型的伸缩性(scalability)很差,因为下游的多个 Consumer 都要“抢”这个共享消息队列的消息。

  • 2)发布 / 订阅模型倒是允许消息被多个 Consumer 消费,但它的问题也是伸缩性不高,因为每个订阅者都必须要订阅主题的所有分区。这种全量订阅的方式既不灵活,也会影响消息的真实投递效果。

Kafka 的 Consumer Group 机制正好避开这两种模型的缺陷,又兼具它们的优点。

Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型

  • 如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;
  • 如果所有实例分别属于不同的 Group,那么它实现的就是发布 / 订阅模型。

2. Consumer Group

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。

组内可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费

  • Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。
    • 在实际场景中,使用进程更为常见一些。
  • Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
  • Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。
    • 这个分区当然也可以被其他的 Group 消费。

理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数,这样能最大限度地实现高伸缩性。

注意:Consumer Group 中的实例是以 分区 为单位进行消费的,如果实例数大于分区数就会导致有的实例无法消费到任何消息。

假如有 6 个分区,Consumer Group 中却有 8 个实例,那么有两个实例将不会被分配任何分区,它们永远处于空闲状态。

因此,在实际使用过程中一般不推荐设置大于总分区数的 Consumer 实例

3. Rebalance

Rebalance 流程

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区

比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。

那么 Consumer Group 何时进行 Rebalance 呢?Rebalance 的触发条件有 3 个。

  • 1)组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,亦或是有 Consumer 实例崩溃被“踢出”组。
  • 2)订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  • 3)订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。

假设目前某个 Consumer Group 下有两个 Consumer,比如 A 和 B,当第三个成员 C 加入时,Kafka 会触发 Rebalance,并根据默认的分配策略重新为 A、B 和 C 分配分区,如下图所示:

https://github.com/lixd/blog/raw/master/images/kafka/kafka-rebalance.webp

Rebalance 的弊端

在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。

其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。

其实更高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么 Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3,而不是被重新分配其他的分区。这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。

最后,Rebalance 实在是太慢了。

曾经,有个国外用户的 Group 内有几百个 Consumer 实例,成功 Rebalance 一次要几个小时!这完全是不能忍受的。最悲剧的是,目前社区对此无能为力,至少现在还没有特别好的解决方案。

因此一些大数据框架都使用的 standalone consumer。

避免无效 Rebalance

要避免 Rebalance,还是要从 Rebalance 发生的时机入手:

  • 1)组成员数量发生变化
  • 2)订阅主题数量发生变化
  • 3)订阅主题的分区数发生变化

后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。所以我们主要说说因为组成员数量变化而引发的 Rebalance 该如何避免。

如果 Consumer Group 下的 Consumer 实例数量发生变化,就一定会引发 Rebalance。这是 Rebalance 发生的最常见的原因。

除了手动启动或停止之外,Consumer 实例可能会被 Coordinator 错误地认为“已停止”从而被“踢出”Group。如果是这个原因导致的 Rebalance,我们就不能不管了。

Coordinator 即协调者,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。

当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。

Consumer 端有个参数,叫 session.timeout.ms,就是被用来表征此事的。该参数的默认值是 10 秒,即如果 Coordinator 在 10 秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了。可以这么说,session.timeout.ms 决定了 Consumer 存活性的时间间隔。

Consumer 还提供了一个允许你控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。

Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。因此,你需要仔细地设置 session.timeout.msheartbeat.interval.ms 的值。我在这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。

  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。
  • 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。

最后 Consumer 端的因为 GC 设置不合理导致程序频发 Full GC 也可能引发非预期 Rebalance

小结,调整以下 4 个参数以避免无效 Rebalance:

  • session.timeout.ms
  • heartbeat.interval.ms
  • max.poll.interval.ms
  • GC 参数

4. Go 客户端

// sarama 库中消费者组为一个接口 sarama.ConsumerGroup 所有实现该接口的类型都能当做消费者组使用。

// MyConsumerGroupHandler 实现 sarama.ConsumerGroup 接口,作为自定义ConsumerGroup
type MyConsumerGroupHandler struct {
	name  string
	count int64
}

// Setup 执行在 获得新 session 后 的第一步, 在 ConsumeClaim() 之前
func (MyConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }

// Cleanup 执行在 session 结束前, 当所有 ConsumeClaim goroutines 都退出时
func (MyConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

// ConsumeClaim 具体的消费逻辑
func (h MyConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		fmt.Printf("[consumer] name:%s topic:%q partition:%d offset:%d\n", h.name, msg.Topic, msg.Partition, msg.Offset)
		// 标记消息已被消费 内部会更新 consumer offset
		sess.MarkMessage(msg, "")
		h.count++
		if h.count%10000 == 0 {
			fmt.Printf("name:%s 消费数:%v\n", h.name, h.count)
		}
	}
	return nil
}

func ConsumerGroup(topic, group, name string) {
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	cg, err := sarama.NewConsumerGroup([]string{conf.HOST}, group, config)
	if err != nil {
		log.Fatal("NewConsumerGroup err: ", err)
	}
	defer cg.Close()
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		handler := MyConsumerGroupHandler{name: name}
		for {
			fmt.Println("running: ", name)
			/*
				![important]
				应该在一个无限循环中不停地调用 Consume()
				因为每次 Rebalance 后需要再次执行 Consume() 来恢复连接
				Consume 开始才发起 Join Group 请求 如果当前消费者加入后成为了 消费者组 leader,则还会进行 Rebalance 过程,从新分配
				组内每个消费组需要消费的 topic 和 partition,最后 Sync Group 后才开始消费
				具体信息见 https://github.com/lixd/kafka-go-example/issues/4
			*/
			err = cg.Consume(ctx, []string{topic}, handler)
			if err != nil {
				log.Println("Consume err: ", err)
			}
			// 如果 context 被 cancel 了,那么退出
			if ctx.Err() != nil {
				return
			}
		}
	}()
	wg.Wait()
}

注意点:

  • 1)需要实现 sarama.ConsumerGroup 接口
  • 2)具体消费逻辑在 ConsumeClaim 方法中实现,消费完成后通过sess.MarkMessage标记消息已经被消费
  • 3)需要在 for 循环中调用 Consume 方法,因为 Rebalance 的存在会导致连接终端或者被分配到其他 Broker 上去,需要重新建立连接,具体见 issues#4

5. 小结

Kakfa 相关代码见 Github

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。

建议消费者组中的实例数等于 Topic 的分区数。

Rebalance 弊端:

  • 影响 Consumer 端 TPS
  • 很慢
  • 效率不高

非必要 Rebalance:

  • 因为Consumer没能及时发送心跳请求,导致被踢出”Group而引发的。
  • Consumer消费时间过长导致的。

减少 Rebalance 的 4个参数:

  • session.timeout.ms
  • heartbeat.interval.ms
  • max.poll.interval.ms
  • GC 参数

6. 参考

https://github.com/Shopify/sarama

《Kafka 核心技术与实战》