From 9ab1db48495060931e2ce547d791109e248b5d5b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 24 Jan 2024 22:46:32 +0100 Subject: [PATCH] WIP --- src/main/java/de/juplo/kafka/SimpleProducer.java | 1 + src/test/java/de/juplo/kafka/ApplicationTests.java | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/SimpleProducer.java b/src/main/java/de/juplo/kafka/SimpleProducer.java index 2b5b441..b9e8152 100644 --- a/src/main/java/de/juplo/kafka/SimpleProducer.java +++ b/src/main/java/de/juplo/kafka/SimpleProducer.java @@ -33,6 +33,7 @@ public class SimpleProducer this.id = clientId; this.topic = topic; producer = new KafkaProducer<>(props); + producer.initTransactions(); } public void run() diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index bfb665f..d72ddfb 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -16,7 +16,12 @@ import static org.awaitility.Awaitility.*; @ExtendWith(SpringExtension.class) -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@EmbeddedKafka( + topics = TOPIC, + partitions = PARTITIONS, + brokerProperties = { + "transaction.state.log.replication.factor=1", + "transaction.state.log.min.isr=1" }) @Slf4j public class ApplicationTests { -- 2.20.1