From e1e2990a0c20c7eff508356191955597fee3a7c0 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 24 Jan 2024 22:55:41 +0100 Subject: [PATCH] WIP --- pom.xml | 5 ++++ .../java/de/juplo/kafka/SimpleProducer.java | 10 +++++++ .../java/de/juplo/kafka/ApplicationTests.java | 27 +++++++++++++++---- 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 1c6d364..458b0e3 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,11 @@ awaitility test + + org.assertj + assertj-core + test + diff --git a/src/main/java/de/juplo/kafka/SimpleProducer.java b/src/main/java/de/juplo/kafka/SimpleProducer.java index b9e8152..3cf1128 100644 --- a/src/main/java/de/juplo/kafka/SimpleProducer.java +++ b/src/main/java/de/juplo/kafka/SimpleProducer.java @@ -58,6 +58,16 @@ public class SimpleProducer } } + void begin() + { + producer.beginTransaction(); + } + + void commit() + { + producer.commitTransaction(); + } + CompletableFuture send(String key, String value) { final CompletableFuture result = new CompletableFuture<>(); diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index d72ddfb..a93c974 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -1,6 +1,7 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Value; @@ -43,11 +44,27 @@ public class ApplicationTests TOPIC, "B"); - CompletableFuture result; - - result = instanceA.send("a","message #1"); + instanceA.begin(); + CompletableFuture resultA1 = instanceA.send("a","message #1"); await("Sending of 1. message for A is completed") .atMost(Duration.ofSeconds(5)) - .until(() -> result.isDone()); - } + .until(() -> resultA1.isDone()); + Assertions.assertThat(resultA1.get()) + + instanceB.begin(); + CompletableFuture resultB1 = instanceB.send("b","message #1"); + await("Sending of 1. message for B is completed") + .atMost(Duration.ofSeconds(5)) + .until(() -> resultB1.isDone()); + + CompletableFuture resultA2 = instanceA.send("a","message #2"); + await("Sending of 2. message for A is completed") + .atMost(Duration.ofSeconds(5)) + .until(() -> resultA2.isDone()); + + CompletableFuture resultB2 = instanceB.send("b","message #2"); + await("Sending of 2. message for B is completed") + .atMost(Duration.ofSeconds(5)) + .until(() -> resultB2.isDone()); + } } -- 2.20.1