다른 2개의 서비스에서 동일 토픽을 다른 클래스(서비스내 클래스)로 매핑하는 방법이다.
[A서비스에서 메시지 발행]
topicId: message-topic
message는 2가지 발행:service1
.model.TextMessage,service1
.model.FileMessage
[B서비스에서 메시지 구독]
topicId: message-topic
message는 2가지 발행:service2
.model.TextMessage,service2
.model.FileMessage
spring kafka에서 발행한 메시지의 헤더를 보면 __TypeID__
라는 것이 있다. 이것은 메시지를 보낼 때의 데이터 객체 타입을 매핑하여 전송하게 되고 실제 수신받는 컨슈머는 이 객체타입을 사용하여 deserialize하게 된다. (소스: DefaultJackson2JavaTypeMapper)
그래서 발행과 구독을 다른 클래스로 매핑하는 것은 기본적으로 안된다. 그래서 사용할 수 있는 방법은 아래 2가지이다.
- header를 보지 않는 방법
- type mapping을 사용하는 방법
1. header를 보지 않는 방법
아래와 같이 spring.json.use.type.headers:false를 추가하고 기본 타입을 지정하면 해당 토픽(message-topic)의 모든 타입은 지정한 타입(service2.model.TextMessage)으로 매핑이 된다.
@KafkaListener(
topics = "message-topic",
groupId = "message-group,
properties = {
"spring.json.value.default.type:service2.model.TextMessage",
"spring.json.use.type.headers:false"
}
)
하지만 이런 경우 하나의 토픽에 여러 타입이 사용되는 경우는 사용할 수가 없다.
예를 들어 message-topic에 TextMessage, FileMessage 등 2개의 타입이 있더라도 반드시 하나만 사용을 해야 하는 문제가 있다.
2. type mapping을 사용하는 방법
두 번째 방법은 타입 매핑을 사용하는 방법이다. 발행하는 클래스 타입을 구독하는 클래스 타입으로 매핑하여 적용하는 방법이다.
아래 경우처럼 service1의 TextMessage를 service2의 TextMessage로 매핑을 한다.
(service1.model.TextMessage --> service2.model.TextMessage)
다음처럼 application.yml에 매핑 정보를 추가하여 사용하면 된다.
[application.yml]
spring:
kafka:
consumer:
properties:
spring.json.type.mapping: service1.model.TextMessage:service2.model.TextMessage,service1.model.FileMessage:service2.model.FileMessage
MessageSubscriber.java
@Component
@KafkaListener(
topics = "message-topic",
groupId = "message-group"
)
public class MessageSubscriber {
@KafkaHandler
public void handle(TextMessage textMessage) {
System.out.println("textMessage");
}
@KafkaHandler
public void handle(FileMessage fileMessage) {
System.out.println("fileMessage");
}
@KafkaHandler(isDefault = true)
public void ignore(JsonSerializable jsonSerializable) {
System.out.println("ignore");
}
}
2번 방법으로 사용을 하면 하나의 토픽에 두 개 이상의 타입도 사용할 수가 있다.