Spring Kafka를 사용하여 한 번의 트랜잭션으로 두 개의 Kafka 주제에 쓰기
저는 카프카의 트랜잭션 기능을 사용하여 트랜잭션 내에서 두 가지 주제에 글을 쓸 수 있는 방법이 있는지 알아보려고 합니다.
저는 카프카의 거래를 사용하는 전형적인 시나리오가 소비자-생산자 패턴이며 잘 문서화된 것으로 보인다는 것을 알고 있습니다.
제가 시도한 것:
- 를 생성했습니다.
KafkaTransactionManager
주제별로 - 각각 구성된
ProducerFactory
각각의 트랜잭션 관리자를 사용합니다. - 생성됨
ChainedTransactionManger
의 두 가지 예로KafkaTransactionManager
생성됨
KafkaTemplate
주제별로그런 다음 사용했습니다.
@Transactional(transactionManager = "chainedTx")
다음을 수행하는 메서드에 대한 주석:template1.send("topic1", "example payload"); template2.send("topic2", "example payload");
이거 안 돼요. 그.KafkaTemplate
거래적이지만, 그 때.send()
메소드가 호출되고, 진행 중인 트랜잭션이 없으며, 저는 다음 메시지를 받습니다.IllegalStateException
.
저는 그것을 시도하려고 했습니다.KafkaTemplate.executeInTransaction()
하지만 자바독은 이것이 지역 거래만을 위한 것이므로 내 필요에 맞지 않는 것으로 보인다고 말합니다.
다음 단계는 카프카의 Producer API를 직접 사용하여 이 패턴이 작동하는지 확인하는 것입니다. 하지만 누군가 제가 시간을 낭비하고 있고 카프카가 여러 주제에 대한 트랜잭션 쓰기를 지원하지 않는다는 것을 알려주시면 감사하겠습니다.
저는 Confluent의 Kafka 트랜잭션 지원 블로그에서 다음과 같은 문구를 발견했습니다.
트랜잭션을 사용하여 여러 Kafka 항목 및 파티션에 원자적으로 쓸 수 있습니다...
하지만 저는 그것을 증명하는 어떤 예도 찾지 못했습니다.
첫 번째 생산자의 구성
@구성 공용 클래스 ControlProducerConfig {
@Bean("controlTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
return new KafkaTransactionManager<>(factory());
}
@Bean("controlTemplate")
public KafkaTemplate<String, String> template() {
return new KafkaTemplate<>(factory());
}
private ProducerFactory<String, String> factory() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
factory.setTransactionIdPrefix("abcd");
return factory;
}
private Map<String, Object> config() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");
props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
// you can't set idempotence without setting max in flight requests to <= 5
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");
return props;
}
}
두 번째 생산자의 구성
@Configuration
public class PayloadProducerConfig {
@Bean("payloadTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
return new KafkaTransactionManager<>(factory());
}
@Bean("payloadTemplate")
public KafkaTemplate<String, String> template() {
return new KafkaTemplate<>(factory());
}
private ProducerFactory<String, String> factory() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
factory.setTransactionIdPrefix("abcd");
return factory;
}
private Map<String, Object> config() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");
props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
// you can't set idempotence without setting max in flight requests to <= 5
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");
return props;
}
}
메인클래스
@EnableTransactionManagement
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean("chainedTx")
public ChainedTransactionManager chained(
@Qualifier("controlTransactionManager") KafkaTransactionManager controlTransactionManager,
@Qualifier("payloadTransactionManager") KafkaTransactionManager payloadTransactionManager) {
return new ChainedTransactionManager(controlTransactionManager, payloadTransactionManager);
}
@Bean OnStart onStart(PostTwoMessages postTwoMessages) {
return new OnStart(postTwoMessages);
}
@Bean
public PostTwoMessages postTwoMessages(
@Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
@Qualifier("controlTemplate") KafkaTemplate<String, String> payloadTemplate) {
return new PostTwoMessages(controlTemplate, payloadTemplate);
}
}
응용 프로그램 시작 시
public class OnStart implements ApplicationListener<ApplicationReadyEvent> {
private PostTwoMessages postTwoMessages;
public OnStart(PostTwoMessages postTwoMessages) {
this.postTwoMessages = postTwoMessages;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
postTwoMessages.run();
}
}
두 개의 메시지 게시
public class PostTwoMessages {
private final KafkaTemplate<String, String> controlTemplate;
private final KafkaTemplate<String, String> payloadTemplate;
public PostTwoMessages(
@Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
@Qualifier("payloadTemplate") KafkaTemplate<String, String> payloadTemplate) {
this.controlTemplate = controlTemplate;
this.payloadTemplate = payloadTemplate;
}
@Transactional(transactionManager = "chainedTx")
public void run() {
UUID uuid = UUID.randomUUID();
controlTemplate.send("private.s0869y.trx.model3a", "control: " + uuid);
payloadTemplate.send("private.s0869y.trx.model3b", "payload: " + uuid);
}
}
효과가 있을 것입니다; 당신은 가지고 있습니까?@EnableTransactionManagement
?
그러나 트랜잭션은 두 개의 서로 다른 생산자에 걸쳐 있을 수 없습니다. 동일한 템플릿을 사용하여 두 개의 전송을 모두 수행해야 합니다.그렇지 않으면 두 개의 다른 거래입니다.
편집
다음은 Spring Boot 애플리케이션의 예입니다.
EDIT2
다음을 통해 로컬 트랜잭션을 사용하여 표시하는 예제 업데이트executeInTransaction
.
@SpringBootApplication
public class So54865968Application {
public static void main(String[] args) {
SpringApplication.run(So54865968Application.class, args);
}
@Bean
public ApplicationRunner runner(Foo foo) {
return args -> {
foo.runInTx();
System.out.println("Committed 1");
foo.runInLocalTx();
System.out.println("Committed 2");
};
}
@Bean
public Foo foo(KafkaTemplate<String, Object> template) {
return new Foo(template);
}
@Bean
public Bar bar() {
return new Bar();
}
@Bean
public NewTopic topic1() {
return new NewTopic("so54865968-1", 1, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("so54865968-2", 1, (short) 1);
}
public static class Foo {
private final KafkaTemplate<String, Object> template;
public Foo(KafkaTemplate<String, Object> template) {
this.template = template;
}
@Transactional(transactionManager = "kafkaTransactionManager")
public void runInTx() throws InterruptedException {
this.template.send("so54865968-1", 42);
this.template.send("so54865968-2", "texttest");
System.out.println("Sent 2; waiting a few seconds to commit");
Thread.sleep(5_000);
}
public void runInLocalTx() throws InterruptedException {
this.template.executeInTransaction(t -> {
t.send("so54865968-1", 43);
t.send("so54865968-2", "texttest2");
System.out.println("Sent 2; waiting a few seconds to commit");
try {
Thread.sleep(5_000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return true;
});
}
}
public static class Bar {
@KafkaListener(id = "foo", topics = { "so54865968-1", "so54865968-2" })
public void haandler(byte[] bytes) {
if (bytes.length == 4) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
System.out.println("Received int " + bb.getInt());
}
else {
System.out.println("Received string " + new String(bytes));
}
}
}
}
그리고.
spring.kafka.producer.transaction-id-prefix=tx-id
spring.kafka.producer.properties.value.serializer=com.example.CompositeSerializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.properties.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
그리고.
public class CompositeSerializer implements Serializer<Object> {
private final StringSerializer stringSerializer = new StringSerializer();
private final IntegerSerializer intSerializer = new IntegerSerializer();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Object data) {
return data instanceof Integer ? intSerializer.serialize(topic, (Integer) data)
: stringSerializer.serialize(topic, (String) data);
}
@Override
public void close() {
}
}
그리고.
Received int 42
Received string texttest
두 사람 모두 5초간의 정지 후에 나타났습니다.
언급URL : https://stackoverflow.com/questions/54865968/write-to-two-kafka-topics-in-a-single-transaction-using-spring-kafka
'source' 카테고리의 다른 글
springapplication.properties에서 토끼 대기열 수신을 비활성화하는 중 (0) | 2023.07.21 |
---|---|
정적 컨텐츠 스프링 부트 응용 프로그램 다시 로드 (0) | 2023.07.21 |
오라클에서 인덱스 생성 시간 추정 (0) | 2023.07.21 |
MySQL 쿼리 속도가 매우 느림 - 가끔 (0) | 2023.07.21 |
Super() Python 3과 Python 2를 사용하여 확장하는 Python (0) | 2023.07.21 |