Kafka에서 CommitFailedException이 나오는 유형은 아래와 같다.
- consumer 로직의 처리시간이 max.poll.interval.ms보다 클 경우 리밸런싱으로 인한 컨슈머 그룹에서 제외되었을 경우
- session.timeout.ms시간동안 heartbeat가 오지 않았을 경우 리밸런싱으로 인한 컨슈머 그룹에서 제외되었을 경우
1. consumer 로직의 처리시간이 max.poll.interval.ms보다 클 경우 리밸런싱으로 인한 컨슈머 그룹에서 제외되었을 경우
ConsumerConfig에서
@Bean
public ConsumerFactory<String, JsonSerializable> consumerFactory() {
...
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000); // 5초로 설정
...
}
Kakfa subscriber에서 7초동안 sleep 코드를 넣으면
@KafkaHandler
public void handle(TextMessage textMessage) {
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.err.println("request commit");
}
위의 로직이 완료되고 commit될 때 CommitFailedException이 발생한다.
This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
subscriber 1개로 테스트
00:00:00 로직 실행 (7초 동안)
00:00:05 rebalancing 실행됨 (consumer-group에서 제외)
00:00:07 commit 요청 시 에러 발생
00:00:07 다시 로직 실행 (7초 동안)
시간 | subscriber1 |
---|---|
00:00:00 | 로직 실행 |
00:00:05 | rebalancing 실행됨 (consumer-group에서 제외) |
00:00:07 | commit 요청 시 에러(CommitFailedException) 발생 |
00:00:07 | 다시 로직 실행 (7초 동안) |
subscriber 2개로 테스트
시간 | subscriber1 | subscriber2 |
---|---|---|
00:00:00 | 로직 실행 | - |
00:00:05 | rebalancing 발생(5초) -> subscriber2에서 실행 | 로직 실행 |
00:00:07 | 7초 경과 후 CommitFailedException 발생 | |
00:00:10 | - | rebalancing 발생(5초) -> subscriber1에서 실행 |
00:00:10 | 로직 실행 | |
00:00:12 | - | CommitFailedException 발생 |
이런식으로 계속 실행이 되게 된다.
로직이 처리되고 나서(7초) commit하려는데 코디네이터가 해당 컨슈머를 컨슈머 그룹에서 제외하였다. (5초후)
그래서 commit할 때 컨슈머 그룹에서 제외된 것으로 인식되기 때문에 commit()을 하지 못하는 것이다. (CommitFailedException이)
컨슈머 그룹에서 제외됬다는 메시지
INFO 4844 --- [balancing-group] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-kafka-test-rebalancing-group-1, groupId=kafka-test-rebalancing-group] Member consumer-kafka-test-rebalancing-group-1-f9a193f4-499e-4507-913a-0de16c28394d sending LeaveGroup request to coordinator 172.16.120.203:9092 (id: 2147482646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
에러 코드는 아래와 같다.
2022-07-14 07:57:11.350 INFO 3070 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-kafka-test-rebalancing-group-1, groupId=kafka-test-rebalancing-group] Failing OffsetCommit request since the consumer is not part of an active group
2022-07-14 07:57:11.351 ERROR 3070 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1401) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514) [na:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) [na:na]
at java.base/java.lang.Thread.run(Thread.java:844) [na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1116) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:983) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510) ~[kafka-clients-2.5.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2378) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2373) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2359) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2173) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1133) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
... 4 common frames omitted
2. session.timeout.ms시간동안 heartbeat가 오지 않았을 경우 리밸런싱으로 인한 컨슈머 그룹에서 제외되었을 경우
heartbeat 메시지는 consumer에서 3초(기본값)에 한번씩 주기적으로 broker에 날린다.
메시지 내용을 확인하려면 logger에 아래 내용을 추가한다.
logging:
level:
org.apache.kafka.clients.consumer.internals.AbstractCoordinator: debug
subscriber의 로직에 breakpoint를 걸면 heartbeat 메시지가 날라가지 않으므로 10초 후에 리밸런싱이 되고 컨슈머 그룹에서 제외된다.
그때 로직이 수행되고 커밋하려면 CommitFailedException이 발생한다.
에러 메시지는 아래와 같다.
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1401) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514) [na:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) [na:na]
at java.base/java.lang.Thread.run(Thread.java:844) [na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1213) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1140) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1096) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1076) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:984) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510) ~[kafka-clients-2.5.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2378) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2373) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2359) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2173) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1133) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059) [spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
... 4 common frames omitted