일반적인 서비스 로직을 작성할 때 메소드 내에서 복잡한 서비스 로직을 수행하고 이벤트(kafka)를 발행하는 경우가 많다. 이런 경우 트랜잭션이 성공적으로 완료된 이후에 이벤트가 발행되기를 기대하지만 로직 내의 실행 순서에 따라 의도하지 않게 이벤트가 발행이 되는 경우도 있다.
그래서 스프링에서는 트랜잭션을 종료하고 이벤트를 발행하는 방법으로 @TransactionalEventListener
를 사용을 한다. @TransactionalEventListener
를 사용하게 되면 트랜잭션이 종료된 이후에 이벤트를 발행하는 것이기 때문에 이벤트 발행시점을 트랜잭션 이후로 미룰 수 있는 장점이 있다.
하지만, 여기서는 @TransactionalEventListener
를 사용하지 않고 kafka 트랜잭션과 DB 트랜잭션을 연관시켜서 DB 트랜잭션이 종료된 이후에 이벤트 발행이 되도록 하는 방법을 알아보자.
시나리오
우선, 테스트를 시나리오는 아래와 같다.
- 사용자 등록
- 이벤트 발행 (Publisher)
- 이벤트 수신 (Consumer)
소스 코드
서비스 로직 (사용자 등록)
@Transactional
public void register(String name) throws InterruptedException {
kafkaTemplate.send("event-user-topic", createEvent(name));
Thread.sleep(1000); // 1초간 대기
if (name == null) { // name이 null이면 예외 발생
throw new IllegalArgumentException();
}
}
private UserCreatedEvent createEvent(String name) {
User user = new User(name);
return new UserCreatedEvent(user);
}
subscriber
@Slf4j
@Component
@KafkaListener(
topics = "event-user-topic",
groupId = "user-group-id"
)
public class UserSubscriber {
@KafkaHandler
public void create(UserEvent event) {
log.info("received userEvent: " + event.getUser().toJson());
// 로직
}
}
로그 설정
필요한 로그만 보도록 로그를 설정한다.
logging:
level:
root: error
com.example.user: debug
org.springframework.kafka.core: trace #kafka 로그
org.springframework.transaction.interceptor: trace # 트랜잭션 로그
Kafka Producer 설정
@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfig {
private final KafkaProperties kafkaProperties;
@Bean
public ProducerFactory<String, JsonSerializable> producerFactory() {
Map<String, Object> props = kafkaProperties.buildProducerProperties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaProducerFactory<String, JsonSerializable> pf = new DefaultKafkaProducerFactory<>(props);
return pf;
}
@Bean
public KafkaTemplate<String, JsonSerializable> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
테스트 케이스 작성
실행 로그를 확인하기 위해 테스트 코드를 작성하자.
@SpringBootTest
public class UserServiceTest {
@Autowired
UserService userService;
@Test
public void register() throws InterruptedException {
userService.register();
Thread.sleep(1000);
}
}
실행해보면 아래와 같이 로그가 남는다.
Getting transaction for [com.example.user.service.UserService.register]
Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@5a48d186]
Sending: ProducerRecord(topic=event-user-topic, ...)
Sent: ProducerRecord(topic=event-user-topic, partition=null, ...)
Sent ok: ProducerRecord(topic=event-user-topic, partition=null, headers=RecordHeaders(...
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@5a48d186] close(PT5S)
received userEvent: {
"name" : "홍길동"
}
Completing transaction for [com.example.user.service.UserService.register]
- 로그의 순서를 확인해보면 이벤트를 전송한 이후에 트랜잭션이 완료(
Completing transaction
)되는 것을 확인할 수 있다. - 트랜잭션이 종료되기 전에 subscriber가 이미 이벤트를 받아 처리가 되었다.
우리가 하고자 하는 것은 이벤트 발행과 DB 트랜잭션을 묶어서 DB 트랜잭션이 완료되고 나면 이벤트를 발행하도록 하는 것이다.
해결방안
카프카에서 프로듀서는 트랜잭션 코디네이터에 요청을 함으로써 트랜잭션을 시작한다.
스프링 부트에서 트랜잭션을 사용하기 위해서는 단순히 spring.kafka.producer.transaction-id-prefix
값을 설정하기만 하면 된다. (application.yml 혹은 KafkaProducerConfig에 설정)
스프링 부트에서는 자동적으로 KafkaTransactionManager
bean을 구성하고 listener 컨테이너를 연결한다.
즉, 아래와 같이 추가하면 된다.
@Bean
public ProducerFactory<String, JsonSerializable> producerFactory() {
...
DefaultKafkaProducerFactory<String, JsonSerializable> factory = new DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix("tx-"); // 추가
return factory;
}
위와 같이 설정하고 테스트 케이스를 다시 실행해보자.
@Test
public void register() throws InterruptedException {
userService.register();
Thread.sleep(1000);
}
실행결과
Getting transaction for [com.example.user.service.UserService.register]
Sending: ProducerRecord(topic=event-user-topic, partition=null, ...
Sent: ProducerRecord(topic=event-user-topic, partition=null, ...
Sent ok: ProducerRecord(topic=event-user-topic, partition=null, ...
userSubscriber: {
"name" : "홍길동"
}
Completing transaction for [com.example.user.service.UserService.register]
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@38022758] commitTransaction()
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@38022758] close(PT5S)
실행결과를 보면 이전에는 메시지 전송 후에 트랜잭션을 종료했지만 여기서는
Completing transaction
이후에 KafkacommitTransaction()
이 나오는 것을 보니 DB 트랜잭션이 끝나고 kafka 트랜잭션을 커밋을 하는 것을 알 수 있다.그런데 한 가지 이상한 점은 producer 입장에서는 트랜잭션을 종료한 이후에 이벤트를 커밋했는데 subscriber에서는 트랜잭션이 커밋된 이후 데이터를 읽는 것이 아닌 커밋 전의 데이터를 읽는 문제가 있다. 즉, 트랜잭션 커밋이 완료되면 subscriber가 받을 것으로 기대했지만, 그렇지 않은 것이다.
트랜잭션 실패 케이스
여기에서 트랜잭션 실패 케이스를 한번 작성하고 실행해보자.
@Test
public void registerWithFail() throws InterruptedException {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
userService.register(null);
Thread.sleep(1000);
});
}
실행 결과
Getting transaction for [com.example.user.service.UserService.register]
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@8dedec8] beginTransaction()
Sent: ProducerRecord(topic=event-user-topic, partition=null,
Sent ok: ProducerRecord(topic=event-user-topic, partition=null,
received userEvent: {
"name" : null
}
Completing transaction for [com.example.user.service.UserService.register] after exception: java.lang.IllegalArgumentException
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@8dedec8] abortTransaction()
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@8dedec8] close(PT5S)
이 케이스에서 보면 트랜잭션은 실패하고 롤백되었다는 것이 표시되는데 userEvent는 수신이 된 것을 확인할 수 있다.
즉, Publisher는 트랜잭션 처리가 잘 되는 것으로 보이지만, Consumer는 트랜잭션과 무관하게 작동되는 듯 하다.
Kafka Consumer 트랜잭션
트랜잭션가 실패하는 경우에 consumer에서 메시지가 처리되는 원인을 확인하기 위해 kafka 로그를 한번 확인해보자.(org.apache.kafka: info
설정 필요)
Consumer config를 보면 기본값으로 read_uncommitted
로 설정이 되어 있다.
isolation.level = read_uncommitted
read_uncommited는 아직 커밋되지 않은 레코드도 읽을 수 있다. 심지어 트랜잭션이 실패(abort)한 레코드도 읽는다. 즉 모든 레코드를 읽는다.
우리가 하고자 하는 것은 커밋된 레코드만 읽도록 하는 것인데 이 문제를 해결하기 위해서는 read_uncommitted
를 read_committed
로 변경을 하면 된다.
spring:
kafka:
consumer:
isolation-level: read_committed
이렇게 설정하고 실패하는 테스트 케이스를 다시 실행해보자.
실행결과
Getting transaction for [com.example.user.service.UserService.register]
Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@226e95e9]
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@226e95e9] beginTransaction()
Sending: ProducerRecord(topic=event-user-topic, partition=null,
Sent: ProducerRecord(topic=event-user-topic, partition=null
Sent ok: ProducerRecord(topic=event-user-topic, partition=null,
Completing transaction for [com.example.user.service.UserService.register] after exception: java.lang.IllegalArgumentException
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@226e95e9] abortTransaction()
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@226e95e9] close(PT5S)
트랜잭션이 실패하는 경우에는 Consumer가 메시지를 수신하지 않는 것을 확인할 수 있다.
그러면 성공하는 경우에는 어떤지 확인해보자.
@Test
public void register() throws InterruptedException {
userService.register("홍길동");
Thread.sleep(1000);
}
실행결과
Getting transaction for [com.example.user.service.UserService.register]
Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@6ed7c178]
.KafkaProducer@6ed7c178] beginTransaction()
Sending: ProducerRecord(topic=event-user-topic, partition=null,
Sent: ProducerRecord(topic=event-user-topic, partition=null,
Sent ok: ProducerRecord(topic=event-user-topic, partition=null,
Completing transaction for [com.example.user.service.UserService.register]
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@6ed7c178] commitTransaction()
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@6ed7c178] close(PT5S)
received userEvent: {
"name" : "홍길동"
}
이제 트랜잭션이 종료된 이후에 subscriber가 메시지를 수신하는 것을 알 수 있다. 이제 모든 subscriber는 트랜잭션이 종료된 메시지를 수신한다.
Kafka consumer의 isolation.level
의 기본값이 read_uncommitted
으로 되어 있으니 트랜잭션을 사용할 때는 read_committed
로 변경을 해야한다.
트랜잭션을 사용하지 않을 경우에는 read_uncommitted으로 동작을 하지만 트랜잭션을 사용할 경우에는 isolation.level을 반드시 신경써야 할 것 같다.
정리
카프카에서 트랜잭션을 사용하기 위해서는 아래 설정이 필요하다.
- spring.kafka.producer.transaction-id-prefix: tx-
- spring.kafka.consumer.isolation-level: read_committed