출처: https://docs.spring.io/spring-boot/4.0-SNAPSHOT/reference/messaging/kafka.html
참고: 이 버전은 아직 개발 중이며 안정적인 것으로 간주되지 않습니다. 최신 안정 버전은 Spring Boot 3.5.6을 사용하세요!
Apache Kafka Support
Apache Kafka는 spring-kafka 프로젝트의 자동 구성을 제공하여 지원됩니다.
Kafka 구성은 spring.kafka.*의 외부 구성 속성으로 제어됩니다.
예를 들어, application.properties에 다음 섹션을 선언할 수 있습니다:
Properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
YAML
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
시작 시 토픽을 생성하려면 NewTopic 타입의 빈을 추가하세요.
토픽이 이미 존재하는 경우, 빈은 무시됩니다.
지원되는 더 많은 옵션은 KafkaProperties를 참조하세요.
Sending a Message
Spring의 KafkaTemplate은 자동 구성되며, 다음 예제와 같이 자신의 빈에서 직접 autowire할 수 있습니다:
Java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
public void someMethod() {
this.kafkaTemplate.send("someTopic", "Hello");
}
}
Kotlin
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {
// ...
fun someMethod() {
kafkaTemplate.send("someTopic", "Hello")
}
}
spring.kafka.producer.transaction-id-prefix 속성이 정의되어 있으면, KafkaTransactionManager가 자동으로 구성됩니다.
또한, RecordMessageConverter 빈이 정의되어 있으면, 자동 구성된 KafkaTemplate에 자동으로 연결됩니다.
Receiving a Message
Apache Kafka 인프라가 존재하는 경우, 리스너 엔드포인트를 생성하기 위해 모든 빈에 @KafkaListener로 어노테이션을 추가할 수 있습니다.KafkaListenerContainerFactory가 정의되어 있지 않으면, 기본값이 spring.kafka.listener.*에 정의된 키로 자동 구성됩니다.
다음 컴포넌트는 someTopic 토픽에 리스너 엔드포인트를 생성합니다:
Java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
Kotlin
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@KafkaListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
KafkaTransactionManager 빈이 정의되어 있으면, container factory에 자동으로 연결됩니다.
마찬가지로, RecordFilterStrategy, CommonErrorHandler, AfterRollbackProcessor 또는 ConsumerAwareRebalanceListener 빈이 정의되어 있으면, 기본 factory에 자동으로 연결됩니다.
리스너 타입에 따라, RecordMessageConverter 또는 BatchMessageConverter 빈이 기본 factory에 연결됩니다.
배치 리스너에 RecordMessageConverter 빈만 있는 경우, BatchMessageConverter로 래핑됩니다.
사용자 정의 ChainedKafkaTransactionManager는 일반적으로 자동 구성된 KafkaTransactionManager 빈을 참조하므로 @Primary로 표시되어야 합니다.
Kafka Streams
Spring for Apache Kafka는 StreamsBuilder 객체를 생성하고 스트림의 라이프사이클을 관리하는 factory bean을 제공합니다.kafka-streams가 classpath에 있고 @EnableKafkaStreams 어노테이션으로 Kafka Streams가 활성화되어 있는 한 Spring Boot는 필요한 KafkaStreamsConfiguration 빈을 자동 구성합니다.
Kafka Streams를 활성화한다는 것은 application id와 bootstrap servers를 설정해야 한다는 것을 의미합니다.
전자는 spring.kafka.streams.application-id를 사용하여 구성할 수 있으며, 설정되지 않은 경우 기본값은 spring.application.name입니다.
후자는 전역적으로 설정하거나 streams에 대해서만 특별히 재정의할 수 있습니다.
전용 속성을 사용하여 여러 추가 속성을 사용할 수 있습니다. 다른 임의의 Kafka 속성은 spring.kafka.streams.properties 네임스페이스를 사용하여 설정할 수 있습니다.
자세한 내용은 Additional Kafka Properties도 참조하세요.
factory bean을 사용하려면 다음 예제와 같이 StreamsBuilder를 @Bean에 연결하세요:
Java
import java.util.Locale;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue)
.to("ks1Out",
Produced.with(Serdes.Integer(), new org.springframework.kafka.support.serializer.JsonSerde<>()));
return stream;
}
private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase(Locale.getDefault()));
}
}
Kotlin
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {
@Bean
fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
val stream = streamsBuilder.stream<Int, String>("ks1In")
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(),
org.springframework.kafka.support.serializer.JsonSerde()))
return stream
}
private fun uppercaseValue(key: Int, value: String): KeyValue<Int, String> {
return KeyValue(key, value.uppercase())
}
}
기본적으로 StreamsBuilder 객체에 의해 관리되는 스트림은 자동으로 시작됩니다.spring.kafka.streams.auto-startup 속성을 사용하여 이 동작을 사용자 정의할 수 있습니다.
더 고급 사용자 정의를 위해 StreamsBuilderFactoryBeanConfigurer를 구현하는 임의의 수의 빈을 등록할 수도 있습니다.
Additional Kafka Properties
자동 구성에서 지원하는 속성은 부록의 Integration Properties 섹션에 나와 있습니다.
대부분의 경우, 이러한 속성(하이픈 또는 camelCase)은 Apache Kafka의 점으로 구분된 속성에 직접 매핑됩니다.
자세한 내용은 Apache Kafka 문서를 참조하세요.
이름에 클라이언트 타입(producer, consumer, admin 또는 streams)이 포함되지 않은 속성은 공통으로 간주되며 모든 클라이언트에 적용됩니다.
이러한 공통 속성의 대부분은 필요한 경우 하나 이상의 클라이언트 타입에 대해 재정의될 수 있습니다.
Apache Kafka는 HIGH, MEDIUM 또는 LOW의 중요도로 속성을 지정합니다.
Spring Boot 자동 구성은 모든 HIGH 중요도 속성, 일부 선택된 MEDIUM 및 LOW 속성, 그리고 기본값이 없는 모든 속성을 지원합니다.
Kafka에서 지원하는 속성의 하위 집합만 KafkaProperties 클래스를 통해 직접 사용할 수 있습니다.
직접 지원되지 않는 추가 속성으로 개별 클라이언트 타입을 구성하려면 다음 속성을 사용하세요:
Properties
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
YAML
spring:
kafka:
properties:
"[prop.one]": "first"
admin:
properties:
"[prop.two]": "second"
consumer:
properties:
"[prop.three]": "third"
producer:
properties:
"[prop.four]": "fourth"
streams:
properties:
"[prop.five]": "fifth"
이것은 공통 prop.one Kafka 속성을 first로 설정하고(producers, consumers, admins 및 streams에 적용), prop.two admin 속성을 second로, prop.three consumer 속성을 third로, prop.four producer 속성을 fourth로, prop.five streams 속성을 fifth로 설정합니다.
다음과 같이 Spring Kafka JsonDeserializer를 구성할 수도 있습니다:
Properties
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
YAML
spring:
kafka:
consumer:
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"[spring.json.value.default.type]": "com.example.Invoice"
"[spring.json.trusted.packages]": "com.example.main,com.example.another"
마찬가지로, 헤더에 타입 정보를 보내는 JsonSerializer의 기본 동작을 비활성화할 수 있습니다:
Properties
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
YAML
spring:
kafka:
producer:
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
properties:
"[spring.json.add.type.headers]": false
이 방법으로 설정된 속성은 Spring Boot가 명시적으로 지원하는 모든 구성 항목을 재정의합니다.
Testing with Embedded Kafka
Spring for Apache Kafka는 내장 Apache Kafka 브로커로 프로젝트를 테스트하는 편리한 방법을 제공합니다.
이 기능을 사용하려면 spring-kafka-test 모듈의 @EmbeddedKafka로 테스트 클래스에 어노테이션을 추가하세요.
자세한 내용은 Spring for Apache Kafka reference manual을 참조하세요.
Spring Boot 자동 구성이 앞서 언급한 내장 Apache Kafka 브로커와 작동하도록 하려면, 내장 브로커 주소(EmbeddedKafkaBroker에 의해 채워짐)에 대한 시스템 속성을 Apache Kafka용 Spring Boot 구성 속성으로 다시 매핑해야 합니다.
이를 수행하는 몇 가지 방법이 있습니다:
테스트 클래스에서 내장 브로커 주소를 spring.kafka.bootstrap-servers로 매핑하는 시스템 속성을 제공합니다:
Java
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
Kotlin
init {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
@EmbeddedKafka 어노테이션에 속성 이름을 구성합니다:
Java
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
Kotlin
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka
@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
구성 속성에서 플레이스홀더를 사용합니다:
Properties
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
YAML
spring:
kafka:
bootstrap-servers: "${spring.embedded.kafka.brokers}"