Spring Boot에서 Kafka 메시지를 수신하는 방법은 여러가지가 있다. 기본적인 방법부터 헤더정보 가져오는 방법, 여러 토픽을 한번에 받는 방법, 설정파일에 지정한 토픽이름으로 받아오는 방법 등 다양한 수신 방법에 대해서 정리한 내용이다.
1. 기본
spring-kafka를 사용하여 메시지를 수신하기 위해서는 기본적으로 @KafkaListenr를 통해 받을 수 있다. 아래 예는 message.command
토픽을 수신하는 방법이다.
@Component
@KafkaListener(
topics = "message.command",
groupId = "message.groupId"
)
public class MessageSubscriber {
@KafkaHandler
public void handle(Message message) {
System.out.println("Message received: " + message);
}
}
2. 헤더 정보 조회
일반적으로 메시지 내용만 관심대상이지만 메시지 헤더 정보가 필요한 경우도 있다. 여기서는 메시지 내용 외에 메시지 헤더 정보를 가져오는 방법이다. 이런 경우 메소드 내에서 @KafkaListener를 사용하고 @Headers
를 통해 받을 수 있다.
@Component
public class MessageSubscriber {
@KafkaListener(
topics = "message.command",
groupId = "message.groupId"
)
public void handle(
@Headers MessageHeaders messageHeaders,
@Payload Message data
) {
System.err.println("messageHeaders: " + messageHeaders);
System.err.println("data: " + data);
}
}
3. 여러 개의 토픽을 받을 경우
두 개 이상의 토픽을 수신하는 경우는 아래와 같이 배열로 지정하면 된다.
@Component
public class MessageSubscriber {
@KafkaListener(
topics ={"message.command", "message.event"},
groupId = "message.groupId"
)
public void handle(
@Headers MessageHeaders messageHeaders,
@Payload Message data
) {
System.err.println("messageHeaders: " + messageHeaders);
System.err.println("data: " + data);
}
}
4. 토픽의 이름을 패턴 형식으로 사용하는 경우
토픽이 message.command와 message.event와 같이 토픽이름이 message로 시작하는 경우 message.*
형식으로 지정하면 된다.
@Component
public class MessageSubscriber {
@KafkaListener(
topicPattern = "message.*",
groupId = "message.groupId"
)
public void handle(
@Headers MessageHeaders messageHeaders,
@Payload Message data
) {
System.err.println("messageHeaders: " + messageHeaders);
System.err.println("data: " + data);
}
}
하지만
*
으로 시작하는*.command
와 같은 형식은 지원하지 않는 듯 하다.그리고 새로운 토픽은 기본적으로
metadata.max.age.ms
에 지정된 시간(5분) 내에 수신이 된다.
5. 프로퍼티에 지정된 여러 개의 토픽 정보를 받을 경우
application.yml에 여러 개의 토픽정보를 지정해놓고 받고 싶은 경우는 아래와 같이 하면 된다.
application.yml에 아래와 같이 ,
로 구분하여 지정한다.
kafka.listen.topics: message.command,message.event
그리고 topics에 SPEL로 split으로 구분한다.
@Component
public class MessageSubscriber {
@KafkaListener(
topics = "#{'${kafka.listen.topics}'.split(',')}",
groupId = "message.groupId"
)
public void handle(
@Headers MessageHeaders messageHeaders,
@Payload Message message
) {
System.err.println("messageHeaders: " + messageHeaders);
System.err.println("data: " + data);
}
}
6. 토픽의 헤더를 상수로 조회
@Component
public class MessageSubscriber {
@KafkaListener(
topics = "#{'${kafka.listen.topics:_no_topic}'.split(',')}",
groupId = "message.groupId"
)
public void handle(
@Header(name = KafkaHeaders.OFFSET, required = false) Long offset,
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
@Headers MessageHeaders messageHeaders,
@Payload Message data
) {
System.out.println("offset: " + offset);
System.out.println("key: " + key);
System.out.println("partition: " + partition);
System.out.println("topic: " + topic);
System.out.println("ts: " + ts);
System.out.println("messageHeaders: " + messageHeaders);
System.out.println("data: " + data);
}
}
반응형