일반적으로 특정 도메인 객체의 상태를 변경하고 이벤트를 발행하는 경우가 많이 있다. 예를 들면 사용자가 등록되고 등록된 사용자를 다른 서비스에서 이벤트를 수신한 후에 후속처리가 필요한 경우이다. 이 이벤트를 받은 서비스들은 사용자 쿠폰발행이나 사용자 활동, 부가 서비스 등을 등록하는 행위를 수행한다.
이런 환경에서 사용할 때 사용자 등록에 대한 트랜잭션이 커밋되고 이벤트를 발행하게 된다. 이런 경우에 발생하는 문제에 대해 테스트를 한번 해보자.
사용자 서비스
기본적인 사용자 등록하는 서비스를 만들어보자.
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
</dependencies>
UserController
@RestController
@RequestMapping("users")
@RequiredArgsConstructor
public class UserController {
private final UserService userService;
@PostMapping
public void register(@RequestBody User user) {
userService.register(user);
}
@GetMapping("{userId}")
public User find(@PathVariable String userId) {
return userService.find(userId);
}
}
UserService
@Service
@RequiredArgsConstructor
public class UserService {
private final KafkaTemplate kafkaTemplate;
private final UserRepository userRepository;
@Transactional
public void register(User user) {
userRepository.save(new UserJpo(user));
}
public User find(String userId) {
return userRepository.findById(userId)
.map(UserJpo::toDomain)
.orElseThrow(() -> new IllegalArgumentException("User not found: " + userId));
}
}
UserJpo
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Getter
public class UserJpo {
@Id
private String userId;
private String userName;
public UserJpo(User user) {
this.userId = user.getUserId();
this.userName = user.getUserName();
}
public User toDomain() {
return new User(this.userId, this.userName);
}
}
UserRepository
public interface UserRepository extends JpaRepository<UserJpo, String> {
}
UserApplication
@SpringBootApplication
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class);
}
}
spring kafka 설정
이벤트 발송은 kafka를 통해 발송한다. spring kafka 사용을 위한 설정을 해보자.
pom.xml 추가
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
KafkaProducerConfig 설정
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
UserCreated 이벤트 발송
사용자 등록이 성공하면 UserCreated 이벤트를 발송해보자. 이벤트를 kafka를 통해 발송한다.
이 로직은 UserService에 추가한다.
@Transactional
public void register(User user) {
userRepository.save(new UserJpo(user)); // DB에 사용자 등록
kafkaTemplate.send("user-topic", new UserCreatedEvent(user)); // User 이벤트 발송
}
@NoArgsConstructor
@AllArgsConstructor
@Getter
public class UserCreatedEvent implements JsonSerializable {
private User user;
}
API 호출
API를 호출해보자.
POST http://localhost:8080/users
content-Type: application/json
{
"userId": "rudaks2",
"userName": "루닥스"
}
성공적으로 동작하는 것을 확인할 수 있다.
실패 케이스
위의 이벤트가 성공적으로 발행될거라고 예상되지만 실패하는 경우는 어떻게 될까?
실패하는 케이스는 다음과 같은 경우가 발생할 수 있다.
- 트랜잭션 실패 (User 등록 및 이벤트 발송이 되었지만 commit하다가 오류 발생)
- Kafka의 비정상 동작 (다운)
1. 트랜잭션 실패
@Transactional
public void register(User user) {
userRepository.save(new UserJpo(user)); // DB에 사용자 등록
kafkaTemplate.send("user-topic", new UserCreatedEvent(user));
}
여기서 save와 send 모두 성공하고 트랜잭션이 commit되는 순간 실패하는 경우에 대한 처리는 어떻게 할까? 트랜잭션이 commit된 이후에 메시지 발송이 되면 좋지 않을까?
트랜잭션 commit이 실패하는 경우를 만들기는 쉽지 않으므로 일단 로그로 확인해보도록 하자.
스프링에서 트랜잭션 로그를 활성화하자.
logging:
level:
org.springframework.transaction.interceptor: trace
UserService
@Transactional
public void register(User user) {
userRepository.save(new UserJpo(user)); // DB에 사용자 등록
kafkaTemplate.send("user-topic", new UserCreatedEvent(user));
log.debug("send UserCreatedEvent");
}
다시 실행해보자.
[실행로그]
2023-08-22 08:09:56.692 DEBUG 44240 --- [nio-8080-exec-1] com.example.user.service.UserService : send UserCreatedEvent
2023-08-22 08:09:56.692 TRACE 44240 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.user.service.UserService.register]
이벤트를 발행하고 트랜잭션 commit이 된 것을 확인할 수 있다.
우리가 원하는 것은 이 순서를 바꾸는 방법인데 @TransactionalEventListener
으로 순서를 바꿀 수 있다.
@TransactionalEventListener를 통해서 트랜잭션 커밋전/후, 롤백전/후에 이벤트를 발생하는 위치를 설정할 수 있다.
UserService를 다시 변경해보자.
private final UserEventPublisher userEventPublisher;
@Transactional
public void register(User user) {
userRepository.save(new UserJpo(user)); // DB에 사용자 등록
userEventPublisher.register(user);
}
@Component
@RequiredArgsConstructor
@Slf4j
public class UserEventPublisher {
private final ApplicationEventPublisher applicationEventPublisher;
private final KafkaTemplate kafkaTemplate;
public void register(User user) {
applicationEventPublisher.publishEvent(user);
}
@TransactionalEventListener
public void handle(User user) {
kafkaTemplate.send("user-topic", new UserCreatedEvent(user));
log.debug("send UserCreatedEvent");
}
}
다시 실행해보면
2023-08-22 08:23:34.793 TRACE 45163 --- [nio-8080-exec-5] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.user.service.UserService.register]
2023-08-22 08:23:34.793 DEBUG 45163 --- [nio-8080-exec-5] c.e.user.service.UserEventPublisher : send UserCreatedEvent
트랜잭션이 종료되고 이벤트가 발행하는 것을 확인할 수 있다.
2. Kafka의 비정상 동작 (다운)
기본적으로 kafka producer에서 send를 하면 비동기로 전송이 된다. 즉, Broker로 바로 전송하지 않고 Producer의 buffer에 저장된 후 전송이 된다.
이 상황을 테스트 하기 위해 API 호출 전에 kafka를 shutdown 시켜놓고 실행해보자.
[실행결과]
API는 정상적으로 실행이 되었다고 하지만 kafka 연결이 끊어졌다는 메시지가 나온다.
2023-08-22 08:00:24.265 INFO 43371 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node 0 disconnected.
2023-08-22 08:00:24.266 WARN 43371 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2
이는 비동기로 실행이 되어서 응답값을 받지 않아서 발생하는 문제이며 동기로 호출하는 코드를 작성해보자.
@Component
@RequiredArgsConstructor
@Slf4j
public class UserEventPublisher {
...
@TransactionalEventListener
public void handle(User user) throws ExecutionException, InterruptedException, TimeoutException {
kafkaTemplate.send("user-topic", new UserCreatedEvent(user)).get(3000, TimeUnit.MILLISECONDS);
log.debug("send UserCreatedEvent");
}
}
동기로 호출하고 3초간 응답을 대기하는 코드이다. kafka가 죽어있는 상태라면 아래 오류를 만나게 된다.
2023-08-22 08:38:37.565 ERROR 45934 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='com.example.user.event.UserCreatedEvent@47c0a12' to topic user-topic:
하지만 위에서 @TransactionalEventListener로 처리했기 때문에 이벤트 발송이 실패하더라도 트랜잭션을 롤백할 수 없다.
이벤트 발송 로직을 서비스 코드에 넣어서 다시 실행하면 예외처리가 되겠지만 최초 검토했었던 다른 문제(commit 실패 시)가 발생한다.
그래서 이벤트 발송을 트랜잭션과 묶어서 같이 사용할 수 있는 Transactional outbox pattern이 필요하다.
Transactional Outbox Pattern
메시지를 전송하는 서비스에서 데이터베이스에 먼저 저장하고 나서 이벤트를 처리하는 패턴
이다. 이렇게 함으로써 트랜잭션 및 메시지 브로커의 다른 상태(실패)에 따른 메시지 발행이 개별적으로 동작하지 않고 DB 트랜잭션에 의존적으로 동작하게 함으로써 일관성을 지킬 수 있는 패턴이다.
소스는 간단하다. 기존에 사용자를 저장하고 메시지 발행하는 코드에서 메시지 발행하는 로직을 DB에 저장하는 것으로 변경한다.
@Transactional
public void register(User user) {
userRepository.save(new UserJpo(user)); // DB에 사용자 등록
messagePublishingRepository.save(new MessageJpo(toJson(user)));
}
그리고 저장된 메시지를 읽어서 발송하는 로직을 추가하면 된다. 기본적으로 스프링 스케쥴러로 발송하면 된다.
@Component
@Slf4j
public class MessagePublishingJob {
@Scheduled(fixedRate = 1000)
public void send() {
// find
// send
// delete
}
}