]> juplo.de Git - demos/kafka/training/commitdiff
Der Payload einer Nachricht wird über alle Nachrichten hochgezählt
authorKai Moritz <kai@juplo.de>
Sun, 15 Dec 2024 13:42:40 +0000 (14:42 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 6 Feb 2025 17:03:39 +0000 (18:03 +0100)
src/test/java/de/juplo/kafka/ExampleConsumerTest.java

index b4322010c7aff98151f51b390cb9311944c8fdd6..03269fd2eccead896313aa943d0212b0f24b3403 100644 (file)
@@ -155,6 +155,8 @@ public class ExampleConsumerTest
   final LongSerializer serializer = new LongSerializer();
   final long[] currentOffsets = new long[NUM_PARTITIONS];
 
+  long nextMessage = 1;
+
   final MockRecordHandler mockRecordHandler = new MockRecordHandler();
   final AtomicBoolean isTerminatedExceptionally = new AtomicBoolean();
 
@@ -191,6 +193,7 @@ public class ExampleConsumerTest
       .all()
       .get();
     mockRecordHandler.clear();
+    nextMessage = 1;
     isTerminatedExceptionally.set(false);
   }
 
@@ -212,7 +215,7 @@ public class ExampleConsumerTest
 
   private void sendValidMessage(int partition)
   {
-    send(partition, partition);
+    send(partition, nextMessage);
   }
 
   private void sendNonDeserializableMessage(int partition)
@@ -232,6 +235,7 @@ public class ExampleConsumerTest
 
   private void send(int partition, byte[] bytes)
   {
+    nextMessage++;
     kafkaTemplate
       .send(TOPIC, partition, "EGAL", bytes)
       .thenAccept(result ->