source

Spring Kafka를 사용하여 한 번의 트랜잭션으로 두 개의 Kafka 주제에 쓰기

ittop 2023. 7. 21. 21:57
반응형

Spring Kafka를 사용하여 한 번의 트랜잭션으로 두 개의 Kafka 주제에 쓰기

저는 카프카의 트랜잭션 기능을 사용하여 트랜잭션 내에서 두 가지 주제에 글을 쓸 수 있는 방법이 있는지 알아보려고 합니다.

저는 카프카의 거래를 사용하는 전형적인 시나리오가 소비자-생산자 패턴이며 잘 문서화된 것으로 보인다는 것을 알고 있습니다.

제가 시도한 것:

  1. 를 생성했습니다.KafkaTransactionManager주제별로
  2. 각각 구성된ProducerFactory각각의 트랜잭션 관리자를 사용합니다.
  3. 생성됨ChainedTransactionManger의 두 가지 예로KafkaTransactionManager
  4. 생성됨KafkaTemplate주제별로

    그런 다음 사용했습니다.@Transactional(transactionManager = "chainedTx")다음을 수행하는 메서드에 대한 주석:

    template1.send("topic1", "example payload");
    template2.send("topic2", "example payload");
    

이거 안 돼요. 그.KafkaTemplate거래적이지만, 그 때.send()메소드가 호출되고, 진행 중인 트랜잭션이 없으며, 저는 다음 메시지를 받습니다.IllegalStateException.

저는 그것을 시도하려고 했습니다.KafkaTemplate.executeInTransaction()하지만 자바독은 이것이 지역 거래만을 위한 것이므로 내 필요에 맞지 않는 것으로 보인다고 말합니다.

다음 단계는 카프카의 Producer API를 직접 사용하여 이 패턴이 작동하는지 확인하는 것입니다. 하지만 누군가 제가 시간을 낭비하고 있고 카프카가 여러 주제에 대한 트랜잭션 쓰기를 지원하지 않는다는 것을 알려주시면 감사하겠습니다.

저는 Confluent의 Kafka 트랜잭션 지원 블로그에서 다음과 같은 문구를 발견했습니다.

트랜잭션을 사용하여 여러 Kafka 항목 및 파티션에 원자적으로 쓸 수 있습니다...

하지만 저는 그것을 증명하는 어떤 예도 찾지 못했습니다.

첫 번째 생산자의 구성

@구성 공용 클래스 ControlProducerConfig {

@Bean("controlTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
    return  new KafkaTransactionManager<>(factory());
}

@Bean("controlTemplate")
public KafkaTemplate<String, String> template() {
    return new KafkaTemplate<>(factory());
}

private ProducerFactory<String, String> factory() {
    DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
    factory.setTransactionIdPrefix("abcd");
    return factory;
}

private Map<String, Object> config() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");

    props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

    // you can't set idempotence without setting max in flight requests to <= 5
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");

    return props;
}

}

두 번째 생산자의 구성

@Configuration
public class PayloadProducerConfig {


@Bean("payloadTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
    return new KafkaTransactionManager<>(factory());
}

@Bean("payloadTemplate")
public KafkaTemplate<String, String> template() {
    return new KafkaTemplate<>(factory());
}

private ProducerFactory<String, String> factory() {
    DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
    factory.setTransactionIdPrefix("abcd");
    return factory;
}

private Map<String, Object> config() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");

    props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

    // you can't set idempotence without setting max in flight requests to <= 5
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");

    return props;
}

}

메인클래스

@EnableTransactionManagement
@SpringBootApplication
public class App {

public static void main(String[] args) {
    SpringApplication.run(App.class, args);
}

@Bean("chainedTx")
public ChainedTransactionManager chained(
    @Qualifier("controlTransactionManager") KafkaTransactionManager controlTransactionManager,
    @Qualifier("payloadTransactionManager") KafkaTransactionManager payloadTransactionManager) {

    return new ChainedTransactionManager(controlTransactionManager, payloadTransactionManager);
}

@Bean OnStart onStart(PostTwoMessages postTwoMessages) {
    return new OnStart(postTwoMessages);
}

@Bean
public PostTwoMessages postTwoMessages(
    @Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
    @Qualifier("controlTemplate") KafkaTemplate<String, String> payloadTemplate) {

    return new PostTwoMessages(controlTemplate, payloadTemplate);
}

}

응용 프로그램 시작 시

public class OnStart implements ApplicationListener<ApplicationReadyEvent> {

private PostTwoMessages postTwoMessages;

public OnStart(PostTwoMessages postTwoMessages) {
    this.postTwoMessages = postTwoMessages;
}

@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
    postTwoMessages.run();
}

}

두 개의 메시지 게시

public class PostTwoMessages  {

private final KafkaTemplate<String, String> controlTemplate;
private final KafkaTemplate<String, String> payloadTemplate;

public PostTwoMessages(
    @Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
    @Qualifier("payloadTemplate") KafkaTemplate<String, String> payloadTemplate) {

    this.controlTemplate = controlTemplate;
    this.payloadTemplate = payloadTemplate;
}

@Transactional(transactionManager = "chainedTx")
public void run() {
    UUID uuid = UUID.randomUUID();
    controlTemplate.send("private.s0869y.trx.model3a", "control: " + uuid);
    payloadTemplate.send("private.s0869y.trx.model3b", "payload: " + uuid);
}

}

효과가 있을 것입니다; 당신은 가지고 있습니까?@EnableTransactionManagement?

그러나 트랜잭션은 두 개의 서로 다른 생산자에 걸쳐 있을 수 없습니다. 동일한 템플릿을 사용하여 두 개의 전송을 모두 수행해야 합니다.그렇지 않으면 두 개의 다른 거래입니다.

편집

다음은 Spring Boot 애플리케이션의 예입니다.

EDIT2

다음을 통해 로컬 트랜잭션을 사용하여 표시하는 예제 업데이트executeInTransaction.

@SpringBootApplication
public class So54865968Application {

    public static void main(String[] args) {
        SpringApplication.run(So54865968Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> {
            foo.runInTx();
            System.out.println("Committed 1");
            foo.runInLocalTx();
            System.out.println("Committed 2");
        };
    }

    @Bean
    public Foo foo(KafkaTemplate<String, Object> template) {
        return new Foo(template);
    }

    @Bean
    public Bar bar() {
        return new Bar();
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("so54865968-1", 1, (short) 1);
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("so54865968-2", 1, (short) 1);
    }

    public static class Foo {

        private final KafkaTemplate<String, Object> template;

        public Foo(KafkaTemplate<String, Object> template) {
            this.template = template;
        }

        @Transactional(transactionManager = "kafkaTransactionManager")
        public void runInTx() throws InterruptedException {
            this.template.send("so54865968-1", 42);
            this.template.send("so54865968-2", "texttest");
            System.out.println("Sent 2; waiting a few seconds to commit");
            Thread.sleep(5_000);
        }

        public void runInLocalTx() throws InterruptedException {
            this.template.executeInTransaction(t -> {
                t.send("so54865968-1", 43);
                t.send("so54865968-2", "texttest2");
                System.out.println("Sent 2; waiting a few seconds to commit");
                try {
                    Thread.sleep(5_000);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return true;
            });
        }

    }

    public static class Bar {

        @KafkaListener(id = "foo", topics = { "so54865968-1", "so54865968-2" })
        public void haandler(byte[] bytes) {
            if (bytes.length == 4) {
                ByteBuffer bb = ByteBuffer.wrap(bytes);
                System.out.println("Received int " + bb.getInt());
            }
            else {
                System.out.println("Received string " + new String(bytes));
            }
        }

    }

}

그리고.

spring.kafka.producer.transaction-id-prefix=tx-id
spring.kafka.producer.properties.value.serializer=com.example.CompositeSerializer

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.properties.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

그리고.

public class CompositeSerializer implements Serializer<Object> {

    private final StringSerializer stringSerializer = new StringSerializer();

    private final IntegerSerializer intSerializer = new IntegerSerializer();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, Object data) {
        return data instanceof Integer ? intSerializer.serialize(topic, (Integer) data)
                : stringSerializer.serialize(topic, (String) data);
    }

    @Override
    public void close() {
    }

}

그리고.

Received int 42
Received string texttest

두 사람 모두 5초간의 정지 후에 나타났습니다.

언급URL : https://stackoverflow.com/questions/54865968/write-to-two-kafka-topics-in-a-single-transaction-using-spring-kafka

반응형