kafka / / 2024. 3. 30. 19:07

Spring Kafka 메시지 수신을 하는 다양한 방법

Spring Boot에서 Kafka 메시지를 수신하는 방법은 여러가지가 있다. 기본적인 방법부터 헤더정보 가져오는 방법, 여러 토픽을 한번에 받는 방법, 설정파일에 지정한 토픽이름으로 받아오는 방법 등 다양한 수신 방법에 대해서 정리한 내용이다.

1. 기본

spring-kafka를 사용하여 메시지를 수신하기 위해서는 기본적으로 @KafkaListenr를 통해 받을 수 있다. 아래 예는 message.command 토픽을 수신하는 방법이다.

@Component
@KafkaListener(
    topics = "message.command",
    groupId = "message.groupId"
)
public class MessageSubscriber {

    @KafkaHandler
    public void handle(Message message) {
        System.out.println("Message received: " + message);
    }
}

2. 헤더 정보 조회

일반적으로 메시지 내용만 관심대상이지만 메시지 헤더 정보가 필요한 경우도 있다. 여기서는 메시지 내용 외에 메시지 헤더 정보를 가져오는 방법이다. 이런 경우 메소드 내에서 @KafkaListener를 사용하고 @Headers를 통해 받을 수 있다.

@Component
public class MessageSubscriber {

  @KafkaListener(
    topics = "message.command",
    groupId = "message.groupId"
  )
  public void handle(
            @Headers MessageHeaders messageHeaders, 
            @Payload Message data
    ) {
        System.err.println("messageHeaders: " + messageHeaders);
        System.err.println("data: " + data);
    }
}

3. 여러 개의 토픽을 받을 경우

두 개 이상의 토픽을 수신하는 경우는 아래와 같이 배열로 지정하면 된다.

@Component
public class MessageSubscriber {

  @KafkaListener(
    topics ={"message.command", "message.event"},
    groupId = "message.groupId"
  )
  public void handle(
            @Headers MessageHeaders messageHeaders, 
            @Payload Message data
    ) {
        System.err.println("messageHeaders: " + messageHeaders);
        System.err.println("data: " + data);
    }
}

4. 토픽의 이름을 패턴 형식으로 사용하는 경우

토픽이 message.command와 message.event와 같이 토픽이름이 message로 시작하는 경우 message.* 형식으로 지정하면 된다.

@Component
public class MessageSubscriber {

  @KafkaListener(
    topicPattern = "message.*",
    groupId = "message.groupId"
  )
  public void handle(
            @Headers MessageHeaders messageHeaders, 
            @Payload Message data
    ) {
        System.err.println("messageHeaders: " + messageHeaders);
        System.err.println("data: " + data);
    }
}

하지만 *으로 시작하는 *.command 와 같은 형식은 지원하지 않는 듯 하다.

그리고 새로운 토픽은 기본적으로 metadata.max.age.ms에 지정된 시간(5분) 내에 수신이 된다.

5. 프로퍼티에 지정된 여러 개의 토픽 정보를 받을 경우

application.yml에 여러 개의 토픽정보를 지정해놓고 받고 싶은 경우는 아래와 같이 하면 된다.

application.yml에 아래와 같이 ,로 구분하여 지정한다.

kafka.listen.topics: message.command,message.event

그리고 topics에 SPEL로 split으로 구분한다.

@Component
public class MessageSubscriber {

  @KafkaListener(
    topics = "#{'${kafka.listen.topics}'.split(',')}",
    groupId = "message.groupId"
  )
  public void handle(
            @Headers MessageHeaders messageHeaders, 
            @Payload Message message
    ) {
        System.err.println("messageHeaders: " + messageHeaders);
        System.err.println("data: " + data);
    }
}

6. 토픽의 헤더를 상수로 조회

@Component
public class MessageSubscriber {

  @KafkaListener(
    topics = "#{'${kafka.listen.topics:_no_topic}'.split(',')}",
    groupId = "message.groupId"
  )
  public void handle(
          @Header(name = KafkaHeaders.OFFSET, required = false) Long offset,
            @Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
            @Headers MessageHeaders messageHeaders,
            @Payload Message data
    ) {
        System.out.println("offset: " + offset);
        System.out.println("key: " + key);
        System.out.println("partition: " + partition);
        System.out.println("topic: " + topic);
        System.out.println("ts: " + ts);
        System.out.println("messageHeaders: " + messageHeaders);
        System.out.println("data: " + data);
    }
}
반응형
  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유
  • 카카오스토리 공유