您的当前位置:首页正文

消息中间件-kafka实战-第五章-kafka重复消费、顺序消费及死信队列

2024-11-08 来源:个人技术集锦

一、参考


二、路由规则(分片规则)

发一个消息,如何知道消息被默认分片到哪里

1.如果没有指定key,是随机分片

2.如果指定了key,即 kafkaTemplate.send(topic, null, jsonValue);

可以套用一下公式计算:

      key.hashCode() % 12

例如有一个topic 叫test,有8个patition,key=“1”,则日志文件在

    "1".hashCode() % 8=1

三、触发重复消费的场景

场景一:触发rebalance

参考:
参考:

rebalance就是kafka认为消费者已经离线或者挂掉,就会触发rebalance把消息分配给新的消费者kafka重新平衡是按group
即当消费速度过慢时有可能会触发rebalance, 这批消息被分配到另一个消费者,然后新的消费者还会消费过慢,再次rebalance, 这样一直恶性循环下去。发生这种情况最明显的标志就是日志里能看到CommitFailedException异常,然后还会带上下面一段话:

问题描述

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

可能原因

  • 原因1:强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)。
  • 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。
  • 原因3:(重复消费最常见的原因):消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
  • 原因4:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。
  • 原因5:当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。
  • 原因6:并发很大,可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能导致reblance重平衡,导致一部分offset自动提交失败,然后重平衡后重复消费

实际影响参数

这里我们需要明确一下,在Kafka 0.10.1.0以后的版本中,影响rebalance触发的参数有三个,说明如下:

  • spring.kafka.properties.session.timeout.ms(默认10秒10000)
    这个参数定义了当broker多久没有收到consumer的心跳请求后就触发rebalance,默认值是10s。在0.10.1.0之前的版本中,由于心跳请求是在poll()拉取消息的方法中执行的,因此如果当前批次处理消息耗时太长,就会导致consumer没有机会按时发送心跳,broker认为消费者已死,触发rebalance。在0.10.1.0或更新的版本中解决了这个问题,心跳请求会在单独的线程中发送,因此就不会出现因为消息处理过长而发不出心跳的问题了。而每次发送心跳请求的时间 spring.kafka.properties.heartbeat.interval.ms = 3000(默认三秒)

  • spring.kafka.properties.max.poll.interval.ms(默认值为5分钟300000)
    这个参数定义了两次poll()之间的最大间隔,默认值为5分钟。如果超过这个间隔同样会触发rebalance。在多数情况下这个参数是导致rebalance消息重复的关键,即业务处理消息耗时太长,导致一直没有commit确认收到的消息,然后超过了消费者设置的最大拉取时间。有人可能会疑惑,如果5分钟都没处理完消息那肯定时出了问题,其实不然。能否在5min内处理完还取决于你每次拉取了多少条消息,如果一次拿到了成千上万条的话,5min就够呛了。有也可能是某个消费者节点正在调试,导致线程一直阻塞在那里,然后超过了最大拉取时间.

  • spring.kafka.consumer.max.poll.records
    这个参数定义了poll()方法最多可以返回多少条消息,默认值为500。注意这里的用词是"最多",也就是说如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,就只返回500。这个默认值是比较坑人的,如果你的消息处理逻辑比较重,比如需要查数据库,调用接口,甚至是复杂计算,那么你很难保证能够在5min内处理完500条消息,也就是说,如果上游真的突然大爆发生产了成千上万条消息,而平摊到每个消费者身上的消息达到了500的又无法按时消费完成的话就会触发rebalance, 然后这批消息会被分配到另一个消费者中,还是会处理不完,又会触发rebalance, 这样这批消息就永远也处理不完,而且一直在重复处理。

在kafka0.10.1 之前:

检查整个消费者死亡和检查消费则处理线程,使用的同一个线程,如果设置的max.poll.interval.ms大于session.timeout.ms,遇到一个处理时间过长的消息,会由于线程忙于处理消息,而无法发送心跳,导致kafka认为改消费则已完全死亡,进而进行Rebalance
所以推荐设置:heartbeat.inerval.ms < max.poll.interval.ms < session.timeout.ms

在kafka0.10.1之后:

session.timeout.ms 和 max.poll.interval.ms 解耦了,拆成了两个线程,不用再担心它们之间的依赖关系
推荐设置:heartbeat.interval.ms < session.timeout.ms

解决方案

要避免出现上述问题也很简单,那就是提前评估好处理一条消息最长需要多少时间,然后务必覆盖默认的max.poll.records参数。在spring-kafka中这个原生参数对应的参数项是max-poll-records。对于消息处理比较重的操作,建议把这个值改到50以下会保险一些。

  • 调整几个参数
spring.kafka.properties.max.poll.interval.ms = 600000
spring.kafka.consumer.max.poll.records = 20
spring.kafka.properties.session.timeout.ms = 25000

# spring设置kafka参数session超时时间时,要小于请求超时时间与处理超时时间,例如:

request.timeout.ms = 30000  session.timeout.ms = 15000    max.poll.interval.ms = 300000

session.timeout.ms < request.timeout.ms

session.timeout.ms < max.poll.interval.ms
  • 把这个组里比较重要的几个topic移动出去,换到其它组(java里只需要改一行):
//这里没有显式配置组,用的是上方KafkaConfig.java里的commonGroup组
//@KafkaListener(topics = "${kafka.topic.commit}")

//改为了显式配置组,把这个topic移动到新组 commitGroup
@KafkaListener(topics = "${kafka.topic.commit}", groupId = "commitGroup")
  • 减少每次拉取的消息记录数和增大poll之间的时间间隔
  • 拉取到消息之后异步处理(保证成功消费)

场景二:服务宕机

可能原因

  • 消费者宕机、重启等。导致消息已经消费但是没有提交offset。

  • 由于网络问题,重复消费不可避免,因此,消费者需要实现消费幂等。#

解决方案

  • ①:消息表

  • ②:数据库唯一索引

  • ③:缓存消费过的消息id

消息幂等性

可以通过redis.setnx方法
key = topic:pardition:offset
redis.setnx(key ,alue);如果没设置过返回1,设置过返回0

四、kafka参数特性

新版kafka的broker幂等性具体实现原理:
  kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查PID和    Sequence Number,如果相同不会再接收。

PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID对用户完全是透明的。生产者如果重启则会生成新的PID。

常见配置

fetch.min.byte:配置Consumer一次拉取请求中能从Kafka中拉取的最小数据量,默认为1B,如果小于这个参数配置的值,就需要进行等待,直到数据量满足这个参数的配置大小。调大可以提交吞吐量,但也会造成延迟

fetch.max.bytes,一次拉取数据的最大数据量,默认为52428800B,也就是50M,但是如果设置的值过小,甚至小于每条消息的值,实际上也是能消费成功的

fetch.wait.max.ms,若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间,默认是500ms

max.poll.records,单次poll调用返回的最大消息记录数,如果处理逻辑很轻量,可以适当提高该值。一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完,默认值为500

consumer.poll(100) ,100 毫秒是一个超时时间,一旦拿到足够多的数据(fetch.min.bytes 参数设置),consumer.poll(100)会立即返回 ConsumerRecords<String, String> records。如果没有拿到足够多的数据,会阻塞100ms,但不会超过100ms就会返回

max.poll.interval.ms,两次拉取消息的间隔,默认5分钟;通过消费组管理消费者时,该配置指定拉取消息线程最长空闲时间,若超过这个时间间隔没有发起poll操作,则消费组认为该消费者已离开了消费组,将进行再均衡操作(将分区分配给组内其他消费者成员)

若超过这个时间则报如下异常:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member. This means that the time between subsequent calls
to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either by increasing the session timeout or by
reducing the maximum size of batches returned in poll() with max.poll.records. 
  即:无法完成提交,因为组已经重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多的时间来处理消息。

可以通过增加max.poll.interval.ms来解决这个问题,也可以通过减少在poll()中使用max.poll.records返回的批的最大大小来解决这个问题

max.partition.fetch.bytes:该属性指定了服务器从每个分区返回给消费者的最大字节数,默认为 1MB。

session.timeout.ms:消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s,将触发再均衡操作

对于每一个Consumer Group,Kafka集群为其从Broker集群中选择一个Broker作为其Coordinator。Coordinator主要做两件事:

维持Group成员的组成。这包括加入新的成员,检测成员的存活性,清除不再存活的成员。

协调Group成员的行为。

poll机制

①:每次poll的消息处理完成之后再进行下一次poll,是同步操作

②:每次poll之前检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移

③:每次poll时,consumer都将尝试使用上次消费的offset作为起始offset,然后依次拉取消息

④:poll(long timeout),timeout指等待轮询缓冲区的数据所花费的时间,单位是毫秒

五、死信队列

方案一:工厂类KafkaListenerContainerFactory

配置工厂类

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.*;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Optional;

/**
 * 消费MQ的消息配置
 * @author demo
 * @create 2022-11-12
 */

@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@ConditionalOnProperty(name = "mq.pipeline", havingValue = IotConstant.MQ_KAFKA)
public class KafkaReceiverConfig {
    @Resource
    private ConsumerFactory<String,String> consumerFactory;
    @Resource
    private KafkaTemplate<String,String>  kafkaTemplate;


    @Bean
    public KafkaListenerContainerFactory<?> retryKafkaFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        // 最大重试次数2次,重试间隔10秒,超过2次(本身一次,重试一次)还没成功,进入死信队列
        // 注意:目前自动创建主题的配置关闭了,需要提前手动去创建好死信队列主题!!! 死信队列主题的命名方式:原主题名称 + .DLT
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(Collections.singletonMap(Object.class, kafkaTemplate)
                , (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition()))
                , new FixedBackOff(10 * 1000L, 2L)));
        return factory;
    }

}

`### 消费者``

```java
 /**
     * 发送消息
     * @param consumerRecord 消息记录
     * @param topicGroupId 消费组
     */
    @KafkaListener(topics = "#{'${mq.alarm.inner.topic.name}'.split(',')}", containerFactory = "retryKafkaFactory")
    public void consumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) {
        Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
        if (kafkaMessage.isPresent()) {
            String id = null;
            try {
                String json = kafkaMessage.get();
                
                // todo
                ...
            } catch (Throwable t) {
                // 判断是否可恢复异常
                if(isRecoverable(t)){
                    // ...
                    IotCacheUtil.deleteMessageId(id);
                    // 送入死信队列
                    throw  t;
                } else {
                    log.error("消费失败 topic:{}, messageId:{}, offset:{}, partition:{} ,异常:{}",consumerRecord.topic(),id,consumerRecord.offset(),consumerRecord.partition(),t);

                }

            }
        }
    }

死信队列消费者

@KafkaListener( topics = "iotAiAlarmInner.DLT")
    public void messageListenerDLT(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) {
        log.info("告警死信队列 topic:{}, offset:{}, partition:{} ,key:{}, message:{}",consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),consumerRecord.key(),Optional.ofNullable(consumerRecord.value()).orElse(null));
        Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
        if (kafkaMessage.isPresent()) {
        }

    }

方案二:错误ConsumerAwareListenerErrorHandler

配置


/**
 * 消费MQ的消息配置
 * @author demo
 * @create 2022-11-12
 */

@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@ConditionalOnProperty(name = "mq.pipeline", havingValue = IotConstant.MQ_KAFKA)
public class KafkaReceiverConfig {
    @Resource
    private ConsumerFactory<String,String> consumerFactory;
    @Resource
    private KafkaTemplate<String,Object>  kafkaTemplate;

    /**
     * 针对tag消息过滤
     * producer 将tag写进header里
     * @return ConcurrentKafkaListenerContainerFactory
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> filterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);

        factory.setRecordFilterStrategy(consumerRecord -> {
            if (Optional.ofNullable(consumerRecord.value()).isPresent()) {
                for (Header header : consumerRecord.headers()) {
                    if (header.key().equals(IotConstant.MQ_TAG) && new String(header.value()).equals(new String(IotConstant.MQ_TAG_VALUE.getBytes(StandardCharsets.UTF_8)))) {
                        return false;
                    }
                }
            }
            //返回true将会被丢弃
            return true;
        });
        return factory;
    }



    @Bean
    public ConsumerAwareListenerErrorHandler consumerIotAlarmAwareErrorHandler() {
        return new ConsumerAwareListenerErrorHandler() {

            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
                log.error("consumerAwareErrorHandler receive : {}, error:{}",message.getPayload(),e);

                //获取消息处理异常主题
                MessageHeaders headers = message.getHeaders();
                /*List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
                List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
                List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);*/
                Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
//                String topic=headers.get("kafka_receivedTopic")+TOPIC_DLT;
                String topic="iotAlarmInner"+ KafkaAlarmListener.TOPIC_DLT;
                //放入死信队列
                kafkaTemplate.send(topic,message.getPayload());
                return message;
            }
        };
    }
}

消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.net.UnknownHostException;
import java.util.Optional;

/**
 * 内部自产自销消费者
 * @author demo
 * @create 2022-11-09
 */
@Slf4j
@Component
@ConditionalOnProperty(name = "mq.pipeline", havingValue = IotConstant.MQ_KAFKA)
public class KafkaAlarmListener {

    @PostConstruct
    public void init() {
        log.info("启动了通用告警消费者");
    }

    public static final String TOPIC_DLT=".DLT";

    @Resource
    private DeviceService deviceService;

    /**
     * 发送消息
     * @param consumerRecord 消息记录
     * @param topicGroupId 消费组
     */
    @KafkaListener(topics = "#{'${mq.alarm.inner.topic.name}'.split(',')}",errorHandler = "consumerIotAlarmAwareErrorHandler",concurrency = "3")
    public void consumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) {
        execute(consumerRecord, false);
    }

    /**
     * 死信队列
     * @param consumerRecord 消息记录
     * @param topicGroupId 消费组
     */
    @KafkaListener( topics = "iotAlarmInner"+TOPIC_DLT)
    public void deadConsumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) {
        log.info("告警死信队列 topic:{}, offset:{}, partition:{} ,key:{}",consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),consumerRecord.key());
        execute(consumerRecord, true);
    }

    private void execute(ConsumerRecord<?, String> consumerRecord,boolean dead){
        String title = dead?"告警死信":"告警";
        Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
        if (kafkaMessage.isPresent()) {
            String id = null;
            try {
                String json = kafkaMessage.get();
                // 解析
                AlarmMessageDTO message = JsonUtil.parse(json,AlarmMessageDTO.class);
                // 判断消息是否已处理
                id = message.getId();
                if(json.length()>10000){

                    log.info("{} topic:{}, offset:{}, partition:{} ,key:{}, messageId:{}",title,consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),consumerRecord.key(),id);
                }else{

                    log.info("{} topic:{}, offset:{}, partition:{} ,key:{}, content:{}, messageId:{}",title,consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),consumerRecord.key(),id, json);
                }
                if(IotCacheUtil.isExistMessageId(id)){
                    log.error("{} topic:{}, messageId:{} 重复消息",title,consumerRecord.topic(), id);
                    return;
                }

                // 处理
                deviceService.uploadAlarm(message);
            } catch (Throwable t) {
                if(dead){
                    log.error("{} 消费异常 topic:{}, offset:{}, partition:{}, messageId:{} ,异常:{}",title,consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),id,t);
                    return;
                }
                // 判断是否可恢复异常
                if(isRecoverable(t)){
                    // ...
                    IotCacheUtil.deleteMessageId(id);
                    // 触发失败errorHandler死信队列
                    throw  t;
                } else {
                    log.error("{} 消费失败 topic:{}, offset:{}, partition:{}, messageId:{} ,异常:{}",title,consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),id,t);

                }

            }
        }
    }

    private boolean isRecoverable(Throwable t){
        if(t instanceof NullPointerException){
            return false;
        }else if( t instanceof UnknownHostException){
            return false;
        }
        return true;
    }

}

六、顺序消费

利用kafka的分区(pardition)功能,通过生产者send的时候设置key,kafka的broker会根据key计算hash,发送到对应的分区

场景

比如用户三次修改名字
我们再发送消息的时候,把userId设置为key,这样保证三条消息都在一个pardition

Top