From: Kai Moritz Date: Wed, 24 Jan 2024 21:55:41 +0000 (+0100) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=e1e2990a0c20c7eff508356191955597fee3a7c0;p=demos%2Fkafka%2Ftraining WIP --- diff --git a/pom.xml b/pom.xml index 1c6d364..458b0e3 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,11 @@ awaitility test + + org.assertj + assertj-core + test + diff --git a/src/main/java/de/juplo/kafka/SimpleProducer.java b/src/main/java/de/juplo/kafka/SimpleProducer.java index b9e8152..3cf1128 100644 --- a/src/main/java/de/juplo/kafka/SimpleProducer.java +++ b/src/main/java/de/juplo/kafka/SimpleProducer.java @@ -58,6 +58,16 @@ public class SimpleProducer } } + void begin() + { + producer.beginTransaction(); + } + + void commit() + { + producer.commitTransaction(); + } + CompletableFuture send(String key, String value) { final CompletableFuture result = new CompletableFuture<>(); diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index d72ddfb..a93c974 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -1,6 +1,7 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Value; @@ -43,11 +44,27 @@ public class ApplicationTests TOPIC, "B"); - CompletableFuture result; - - result = instanceA.send("a","message #1"); + instanceA.begin(); + CompletableFuture resultA1 = instanceA.send("a","message #1"); await("Sending of 1. message for A is completed") .atMost(Duration.ofSeconds(5)) - .until(() -> result.isDone()); - } + .until(() -> resultA1.isDone()); + Assertions.assertThat(resultA1.get()) + + instanceB.begin(); + CompletableFuture resultB1 = instanceB.send("b","message #1"); + await("Sending of 1. message for B is completed") + .atMost(Duration.ofSeconds(5)) + .until(() -> resultB1.isDone()); + + CompletableFuture resultA2 = instanceA.send("a","message #2"); + await("Sending of 2. message for A is completed") + .atMost(Duration.ofSeconds(5)) + .until(() -> resultA2.isDone()); + + CompletableFuture resultB2 = instanceB.send("b","message #2"); + await("Sending of 2. message for B is completed") + .atMost(Duration.ofSeconds(5)) + .until(() -> resultB2.isDone()); + } }