Der Payload einer Nachricht wird über alle Nachrichten hochgezählt
authorKai Moritz <kai@juplo.de>
Sun, 15 Dec 2024 13:42:40 +0000 (14:42 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 24 Dec 2024 12:54:06 +0000 (13:54 +0100)
src/test/java/de/juplo/kafka/ExampleConsumerTest.java

index b432201..03269fd 100644 (file)
@@ -155,6 +155,8 @@ public class ExampleConsumerTest
   final LongSerializer serializer = new LongSerializer();
   final long[] currentOffsets = new long[NUM_PARTITIONS];
 
+  long nextMessage = 1;
+
   final MockRecordHandler mockRecordHandler = new MockRecordHandler();
   final AtomicBoolean isTerminatedExceptionally = new AtomicBoolean();
 
@@ -191,6 +193,7 @@ public class ExampleConsumerTest
       .all()
       .get();
     mockRecordHandler.clear();
+    nextMessage = 1;
     isTerminatedExceptionally.set(false);
   }
 
@@ -212,7 +215,7 @@ public class ExampleConsumerTest
 
   private void sendValidMessage(int partition)
   {
-    send(partition, partition);
+    send(partition, nextMessage);
   }
 
   private void sendNonDeserializableMessage(int partition)
@@ -232,6 +235,7 @@ public class ExampleConsumerTest
 
   private void send(int partition, byte[] bytes)
   {
+    nextMessage++;
     kafkaTemplate
       .send(TOPIC, partition, "EGAL", bytes)
       .thenAccept(result ->