kafka / / 2022. 12. 23. 07:36

KafkaListener에서 서로 다른 message mapping하기

다른 2개의 서비스에서 동일 토픽을 다른 클래스(서비스내 클래스)로 매핑하는 방법이다.

[A서비스에서 메시지 발행]
topicId: message-topic
message는 2가지 발행: service1.model.TextMessage, service1.model.FileMessage

[B서비스에서 메시지 구독]
topicId: message-topic
message는 2가지 발행: service2.model.TextMessage, service2.model.FileMessage

spring kafka에서 발행한 메시지의 헤더를 보면 __TypeID__라는 것이 있다. 이것은 메시지를 보낼 때의 데이터 객체 타입을 매핑하여 전송하게 되고 실제 수신받는 컨슈머는 이 객체타입을 사용하여 deserialize하게 된다. (소스: DefaultJackson2JavaTypeMapper)

그래서 발행과 구독을 다른 클래스로 매핑하는 것은 기본적으로 안된다. 그래서 사용할 수 있는 방법은 아래 2가지이다.

  1. header를 보지 않는 방법
  2. type mapping을 사용하는 방법

1. header를 보지 않는 방법

아래와 같이 spring.json.use.type.headers:false를 추가하고 기본 타입을 지정하면 해당 토픽(message-topic)의 모든 타입은 지정한 타입(service2.model.TextMessage)으로 매핑이 된다.

@KafkaListener(
    topics = "message-topic",
    groupId = "message-group,
    properties = {
        "spring.json.value.default.type:service2.model.TextMessage",
        "spring.json.use.type.headers:false"
    }
)

하지만 이런 경우 하나의 토픽에 여러 타입이 사용되는 경우는 사용할 수가 없다.
예를 들어 message-topic에 TextMessage, FileMessage 등 2개의 타입이 있더라도 반드시 하나만 사용을 해야 하는 문제가 있다.

2. type mapping을 사용하는 방법

두 번째 방법은 타입 매핑을 사용하는 방법이다. 발행하는 클래스 타입을 구독하는 클래스 타입으로 매핑하여 적용하는 방법이다.

아래 경우처럼 service1의 TextMessage를 service2의 TextMessage로 매핑을 한다.
(service1.model.TextMessage --> service2.model.TextMessage)

다음처럼 application.yml에 매핑 정보를 추가하여 사용하면 된다.

[application.yml]

spring:
  kafka:
    consumer:
      properties:
        spring.json.type.mapping: service1.model.TextMessage:service2.model.TextMessage,service1.model.FileMessage:service2.model.FileMessage

MessageSubscriber.java

@Component
@KafkaListener(
        topics = "message-topic",
        groupId = "message-group"
)
public class MessageSubscriber {

    @KafkaHandler
    public void handle(TextMessage textMessage) {
        System.out.println("textMessage");
    }

    @KafkaHandler
    public void handle(FileMessage fileMessage) {
        System.out.println("fileMessage");
    }

    @KafkaHandler(isDefault = true)
    public void ignore(JsonSerializable jsonSerializable) {
        System.out.println("ignore");
    }
}

2번 방법으로 사용을 하면 하나의 토픽에 두 개 이상의 타입도 사용할 수가 있다.

반응형
  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유
  • 카카오스토리 공유