Verbesserungen und Fachlogik-Test aus 'sumup-adder' gemerged
authorKai Moritz <kai@juplo.de>
Tue, 16 Aug 2022 16:58:10 +0000 (18:58 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 17 Aug 2022 20:42:42 +0000 (22:42 +0200)
src/main/java/de/juplo/kafka/AdderBusinessLogic.java
src/main/java/de/juplo/kafka/AdderResult.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/AdderResults.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/StateDocument.java
src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index 1f3d9aa..d525182 100644 (file)
@@ -8,7 +8,7 @@ import java.util.Optional;
 
 public class AdderBusinessLogic
 {
-  private final Map<String, Long> state;
+  private final Map<String, AdderResult> state;
 
 
   public AdderBusinessLogic()
@@ -16,37 +16,31 @@ public class AdderBusinessLogic
     this(new HashMap<>());
   }
 
-  public AdderBusinessLogic(Map<String, Long> state)
+  public AdderBusinessLogic(Map<String, AdderResult> state)
   {
     this.state = state;
   }
 
 
-  public synchronized void startSum(String user)
-  {
-    if (state.containsKey(user))
-      throw new IllegalStateException("Sumation for " + user + " already in progress, state: " + state.get(user));
-
-    state.put(user, 0l);
-  }
-
   public synchronized Optional<Long> getSum(String user)
   {
-    return Optional.ofNullable(state.get(user));
+    return Optional.ofNullable(state.get(user)).map(result -> result.sum);
   }
 
   public synchronized void addToSum(String user, Integer value)
   {
-    if (!state.containsKey(user))
-      throw new IllegalStateException("No sumation for " + user + " in progress");
     if (value == null || value < 1)
       throw new IllegalArgumentException("Not a positive number: " + value);
 
-    long result = state.get(user) + value;
-    state.put(user, result);
+    long sum =
+        Optional
+            .ofNullable(state.get(user))
+            .map(result -> result.sum)
+            .orElse(0l);
+    state.put(user, new AdderResult(value, sum + value));
   }
 
-  public synchronized Long endSum(String user)
+  public synchronized AdderResult calculate(String user)
   {
     if (!state.containsKey(user))
       throw new IllegalStateException("No sumation for " + user + " in progress");
@@ -54,7 +48,7 @@ public class AdderBusinessLogic
     return state.remove(user);
   }
 
-  protected Map<String, Long> getState()
+  protected Map<String, AdderResult> getState()
   {
     return state;
   }
diff --git a/src/main/java/de/juplo/kafka/AdderResult.java b/src/main/java/de/juplo/kafka/AdderResult.java
new file mode 100644 (file)
index 0000000..44b7da8
--- /dev/null
@@ -0,0 +1,21 @@
+package de.juplo.kafka;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+@Getter
+@EqualsAndHashCode
+public class AdderResult
+{
+  final int number;
+  final long sum;
+
+  @Override
+  public String toString()
+  {
+    return "sum(" + number + ") = " + sum;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/AdderResults.java b/src/main/java/de/juplo/kafka/AdderResults.java
new file mode 100644 (file)
index 0000000..e7f5602
--- /dev/null
@@ -0,0 +1,47 @@
+package de.juplo.kafka;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+public class AdderResults
+{
+  private final Map<Integer, Map<String, List<AdderResult>>> results = new HashMap<>();
+
+
+  public void addResults(Integer partition, String user, AdderResult result)
+  {
+    Map<String, List<AdderResult>> resultsByUser = this.results.get(partition);
+
+    List<AdderResult> results = resultsByUser.get(user);
+    if (results == null)
+    {
+      results = new LinkedList<>();
+      resultsByUser.put(user, results);
+    }
+
+    results.add(result);
+  }
+
+  protected void addPartition(Integer partition, Map<String, List<AdderResult>> results)
+  {
+    this.results.put(partition, results);
+  }
+
+  protected Map<String, List<AdderResult>> removePartition(Integer partition)
+  {
+    return this.results.remove(partition);
+  }
+
+  public Map<Integer, Map<String, List<AdderResult>>> getState()
+  {
+    return results;
+  }
+
+  public Map<String, List<AdderResult>> getState(Integer partition)
+  {
+    return results.get(partition);
+  }
+}
index 5e1f8fb..b58295f 100644 (file)
@@ -17,19 +17,27 @@ import java.util.concurrent.Executors;
 public class ApplicationConfiguration
 {
   @Bean
-  public ApplicationRecordHandler recordHandler()
+  public ApplicationRecordHandler recordHandler(AdderResults adderResults)
   {
-    return new ApplicationRecordHandler();
+    return new ApplicationRecordHandler(adderResults);
+  }
+
+  @Bean
+  public AdderResults adderResults()
+  {
+    return new AdderResults();
   }
 
   @Bean
   public ApplicationRebalanceListener rebalanceListener(
       ApplicationRecordHandler recordHandler,
+      AdderResults adderResults,
       StateRepository stateRepository,
       ApplicationProperties properties)
   {
     return new ApplicationRebalanceListener(
         recordHandler,
+        adderResults,
         stateRepository,
         properties.getClientId(),
         Clock.systemDefaultZone(),
index 7256732..32e14e8 100644 (file)
@@ -7,8 +7,7 @@ import org.apache.kafka.common.TopicPartition;
 import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.Collection;
-import java.util.Map;
+import java.util.*;
 
 
 @RequiredArgsConstructor
@@ -16,11 +15,14 @@ import java.util.Map;
 public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
 {
   private final ApplicationRecordHandler recordHandler;
+  private final AdderResults adderResults;
   private final StateRepository stateRepository;
   private final String id;
   private final Clock clock;
   private final Duration commitInterval;
 
+  private final Set<Integer> partitions = new HashSet<>();
+
   private Instant lastCommit = Instant.EPOCH;
 
   @Override
@@ -30,11 +32,13 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
     {
       Integer partition = tp.partition();
       log.info("{} - adding partition: {}", id, partition);
+      this.partitions.add(partition);
       StateDocument document =
           stateRepository
               .findById(Integer.toString(partition))
               .orElse(new StateDocument(partition));
       recordHandler.addPartition(partition, document.state);
+      adderResults.addPartition(partition, document.results);
     });
   }
 
@@ -45,17 +49,19 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
     {
       Integer partition = tp.partition();
       log.info("{} - removing partition: {}", id, partition);
-      Map<String, Long> removed = recordHandler.removePartition(partition);
-      for (String key : removed.keySet())
+      this.partitions.remove(partition);
+      Map<String, AdderResult> state = recordHandler.removePartition(partition);
+      for (String key : state.keySet())
       {
         log.info(
             "{} - Seen {} messages for partition={}|key={}",
             id,
-            removed.get(key),
+            state.get(key),
             partition,
             key);
       }
-      stateRepository.save(new StateDocument(partition, removed));
+      Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
+      stateRepository.save(new StateDocument(partition, state, results));
     });
   }
 
@@ -66,10 +72,11 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
     if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
     {
       log.debug("Storing data, last commit: {}", lastCommit);
-      recordHandler.getState().forEach((partiton, adder) -> stateRepository.save(
+      partitions.forEach(partition -> stateRepository.save(
           new StateDocument(
-              partiton,
-              adder.getState())));
+              partition,
+              recordHandler.getState(partition).getState(),
+              adderResults.getState(partition))));
       lastCommit = clock.instant();
     }
   }
index d0d385c..596f3da 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka;
 
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
@@ -7,9 +8,12 @@ import java.util.HashMap;
 import java.util.Map;
 
 
+@RequiredArgsConstructor
 @Slf4j
 public class ApplicationRecordHandler implements RecordHandler<String, String>
 {
+  private final AdderResults results;
+
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
 
@@ -19,29 +23,24 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
     Integer partition = record.partition();
     String user = record.key();
     String message = record.value();
-    switch (message)
+
+    if (message.equals("CALCULATE"))
     {
-      case "START":
-        state.get(partition).startSum(user);
-        break;
-
-      case "END":
-        Long result = state.get(partition).endSum(user);
-        log.info("New result for {}: {}", user, result);
-        break;
-
-      default:
-        state.get(partition).addToSum(user, Integer.parseInt(message));
-        break;
+      AdderResult result = state.get(partition).calculate(user);
+      log.info("New result for {}: {}", user, result);
+      results.addResults(partition, user, result);
+      return;
     }
+
+    state.get(partition).addToSum(user, Integer.parseInt(message));
   }
 
-  protected void addPartition(Integer partition, Map<String, Long> state)
+  protected void addPartition(Integer partition, Map<String, AdderResult> state)
   {
     this.state.put(partition, new AdderBusinessLogic(state));
   }
 
-  protected Map<String, Long> removePartition(Integer partition)
+  protected Map<String, AdderResult> removePartition(Integer partition)
   {
     return this.state.remove(partition).getState();
   }
@@ -51,4 +50,9 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
   {
     return state;
   }
+
+  public AdderBusinessLogic getState(Integer partition)
+  {
+    return state.get(partition);
+  }
 }
index d389271..26a5bc8 100644 (file)
@@ -5,6 +5,7 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -17,6 +18,7 @@ public class DriverController
 {
   private final EndlessConsumer consumer;
   private final ApplicationRecordHandler recordHandler;
+  private final AdderResults results;
 
 
   @PostMapping("start")
@@ -33,7 +35,7 @@ public class DriverController
 
 
   @GetMapping("state")
-  public Map<Integer, Map<String, Long>> state()
+  public Map<Integer, Map<String, AdderResult>> state()
   {
     return
         recordHandler
@@ -46,7 +48,7 @@ public class DriverController
   }
 
   @GetMapping("state/{user}")
-  public ResponseEntity<Long> seen(@PathVariable String user)
+  public ResponseEntity<Long> state(@PathVariable String user)
   {
     for (AdderBusinessLogic adder : recordHandler.getState().values())
     {
@@ -58,6 +60,25 @@ public class DriverController
     return ResponseEntity.notFound().build();
   }
 
+  @GetMapping("results")
+  public Map<Integer, Map<String, List<AdderResult>>> results()
+  {
+    return results.getState();
+  }
+
+  @GetMapping("results/{user}")
+  public ResponseEntity<List<AdderResult>> results(@PathVariable String user)
+  {
+    for (Map<String, List<AdderResult>> resultsByUser : this.results.getState().values())
+    {
+      List<AdderResult> results = resultsByUser.get(user);
+      if (results != null)
+        return ResponseEntity.ok(results);
+    }
+
+    return ResponseEntity.notFound().build();
+  }
+
 
   @ExceptionHandler
   @ResponseStatus(HttpStatus.BAD_REQUEST)
index c85cc38..ae8eb51 100644 (file)
@@ -5,6 +5,7 @@ import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.core.mapping.Document;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 
@@ -14,7 +15,8 @@ public class StateDocument
 {
   @Id
   public String id;
-  public Map<String, Long> state;
+  public Map<String, AdderResult> state;
+  public Map<String, List<AdderResult>> results;
 
   public StateDocument()
   {
@@ -24,13 +26,16 @@ public class StateDocument
   {
     this.id = Integer.toString(partition);
     this.state = new HashMap<>();
+    this.results = new HashMap<>();
   }
 
   public StateDocument(
       Integer partition,
-      Map<String, Long> state)
+      Map<String, AdderResult> state,
+      Map<String, List<AdderResult>> results)
   {
     this.id = Integer.toString(partition);
     this.state = state;
+    this.results = results;
   }
 }
index 435f036..8e49263 100644 (file)
@@ -16,23 +16,6 @@ import static org.assertj.core.api.Assertions.*;
 
 public class AdderBusinessLogicTest
 {
-  @Test
-  @DisplayName("A new sum can be started, if it does not exist")
-  public void testStartSumPossibleIfNotExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThatNoException().isThrownBy(() -> adder.startSum("foo"));
-  }
-
-  @Test
-  @DisplayName("Starting an already existing sum again, causes an IllegalStateException")
-  public void testStartSumCausesExceptionIfExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    assertThatIllegalStateException().isThrownBy(() -> adder.startSum("foo"));
-  }
-
   @Test
   @DisplayName("An empty Optional should be returned, for a non-existing sum")
   public void testGetSumReturnsEmptyOptionalForNonExistingSum()
@@ -46,101 +29,83 @@ public class AdderBusinessLogicTest
   public void testGetSumReturnsNonEmptyOptionalForExistingSum()
   {
     AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
+    adder.addToSum("foo", 6);
     assertThat(adder.getSum("foo")).isNotEmpty();
   }
 
   @Test
-  @DisplayName("A sum can be ended, if it does exist")
-  public void testEndSumPossibleIfSumExists()
+  @DisplayName("A sum can be calculated, if it does exist")
+  public void testCalculatePossibleIfSumExists()
   {
     AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    assertThatNoException().isThrownBy(() -> adder.endSum("foo"));
+    adder.addToSum("foo", 6);
+    assertThatNoException().isThrownBy(() -> adder.calculate("foo"));
   }
 
   @Test
   @DisplayName("An existing sum is removed, if ended")
-  public void testEndSumRemovesSumIfSumExists()
+  public void testCalculateRemovesSumIfSumExists()
   {
     AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    adder.endSum("foo");
+    adder.addToSum("foo", 6);
+    adder.calculate("foo");
     assertThat(adder.getSum("foo")).isEmpty();
   }
 
   @Test
-  @DisplayName("An existing Sum returns a non-null value, if ended")
-  public void testEndSumReturnsNonNullValueIfSumExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    assertThat(adder.endSum("foo")).isNotNull();
-  }
-
-  @Test
-  @DisplayName("An existing Sum returns a non-negative value, if ended")
-  public void testEndSumReturnsNonNegativeValueIfSumExists()
+  @DisplayName("An existing sum returns a non-null value, if calculated")
+  public void testCalculateReturnsNonNullValueIfSumExists()
   {
     AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    assertThat(adder.endSum("foo")).isNotNegative();
+    adder.addToSum("foo", 6);
+    assertThat(adder.calculate("foo")).isNotNull();
   }
 
   @Test
   @DisplayName("Ending a non-existing sum, causes an IllegalStateException")
-  public void testEndSumCausesExceptionIfNotExists()
+  public void testCalculateCausesExceptionIfNotExists()
   {
     AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThatIllegalStateException().isThrownBy(() -> adder.endSum("foo"));
+    assertThatIllegalStateException().isThrownBy(() -> adder.calculate("foo"));
   }
 
   @Test
-  @DisplayName("Adding to a non-existent sum causes an IllegalStateException")
-  public void testAddToSumCausesExceptionIfNotExists()
+  @DisplayName("Adding a null-value to a sum causes an IllegalArgumentException")
+  public void testAddToSumWithNullValueCausesException()
   {
     AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThatIllegalStateException().isThrownBy(() -> adder.addToSum("foo", 1));
-  }
-
-  @Test
-  @DisplayName("Adding a null-value to an existing sum causes an IllegalArgumentException")
-  public void testAddSumWithNullValueToExistingSumCausesException()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
     assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null));
   }
 
   @ParameterizedTest(name = "{index}: Adding {0}")
-  @DisplayName("Adding a non-positive value to an existing sum causes an IllegalArgumentException")
+  @DisplayName("Adding a non-positive value to a sum causes an IllegalArgumentException")
   @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE })
-  public void testAddSumWithNonPositiveValueToExistingSumCausesException(int value)
+  public void testAddToSumWithNonPositiveValueCausesException(int value)
   {
     AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
     assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value));
   }
 
-  @Test
-  @DisplayName("Can add a positive value to an existing sum")
-  public void testAddSumWithPositiveValuePossibleIfSumExists()
+  @ParameterizedTest(name = "{index}: Adding {0}")
+  @DisplayName("Can add a positive value to a sum")
+  @ValueSource(ints = { 1, 3, 6, 66, 7, 9 })
+  public void testAddToSumWithPositiveValuePossible(int value)
   {
     AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", -1));
+    assertThatNoException().isThrownBy(() -> adder.addToSum("foo", value));
   }
 
   @ParameterizedTest(name = "{index}: Summing up {0}")
   @DisplayName("Adds up numbers correctly")
   @MethodSource("numbersProvider")
-  public void testAddSumAddsUpNumbersCorrectlyIfSumExists(int... numbers)
+  public void testAddToSumAddsUpNumbersCorrectlyIfSumExists(int... numbers)
   {
     long expectedResult = Arrays.stream(numbers).sum();
     AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
     Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number));
-    assertThat(adder.endSum("foo")).isEqualTo(expectedResult);
+    AdderResult result = adder.calculate("foo");
+    assertThat(result.number).isEqualTo(numbers[numbers.length-1]);
+    assertThat(result.sum).isEqualTo(expectedResult);
   }
 
   static Stream<Arguments> numbersProvider() {
index 4ddf8a9..1336050 100644 (file)
@@ -3,82 +3,122 @@ package de.juplo.kafka;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.springframework.beans.factory.annotation.Autowired;
 
+import java.util.*;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 
 public class ApplicationTests extends GenericApplicationTests<String, String>
 {
+  @Autowired
+  AdderResults results;
+
+
   public ApplicationTests()
   {
-    super(
-        new RecordGenerator()
+    super(new ApplicationTestRecrodGenerator());
+    ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
+  }
+
+
+  static class ApplicationTestRecrodGenerator implements RecordGenerator
+  {
+    ApplicationTests tests;
+
+    final int[] numbers = {1, 7, 3, 2, 33, 6, 11};
+    final String[] dieWilden13 =
+        IntStream
+            .range(1, 14)
+            .mapToObj(i -> "seeräuber-" + i)
+            .toArray(i -> new String[i]);
+    final StringSerializer stringSerializer = new StringSerializer();
+    final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
+
+    int counter = 0;
+
+    Map<String, List<AdderResult>> state;
+
+    @Override
+    public int generate(
+        boolean poisonPills,
+        boolean logicErrors,
+        Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
+    {
+      counter = 0;
+      state =
+          Arrays
+              .stream(dieWilden13)
+              .collect(Collectors.toMap(
+                  seeräuber -> seeräuber,
+                  seeräuber -> new LinkedList()));
+
+      for (int i = 0; i < 33; i++)
+      {
+        String seeräuber = dieWilden13[i % 13];
+        int number = numbers[i % 7];
+
+        Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
+
+        for (int message = 1; message <= number; message++)
         {
-          final int[] numbers = { 1, 7, 3, 2, 33, 6, 11 };
-          final String[] dieWilden13 =
-              IntStream
-                  .range(1,14)
-                  .mapToObj(i -> "seeräuber-" + i)
-                  .toArray(i -> new String[i]);
-          final StringSerializer stringSerializer = new StringSerializer();
-          final Bytes startMessage = new Bytes(stringSerializer.serialize(TOPIC, "START"));
-          final Bytes endMessage = new Bytes(stringSerializer.serialize(TOPIC, "END"));
-
-          int counter = 0;
-
-
-          @Override
-          public int generate(
-              boolean poisonPills,
-              boolean logicErrors,
-              Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
-          {
-            counter = 0;
-
-            for (int i = 0; i < 33; i++)
-            {
-              String seeräuber = dieWilden13[i%13];
-              int number = numbers[i%7];
-
-              Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
-
-              send(key, startMessage, logicErrors, messageSender);
-              for (int message = 1; message <= number; message++)
-              {
-                Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
-                send(key, value, logicErrors, messageSender);
-              }
-              send(key, endMessage, logicErrors, messageSender);
-            }
-
-            return counter;
-          }
-
-          void send(
-              Bytes key,
-              Bytes value,
-              boolean logicErrors,
-              Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
-          {
-            counter++;
+          Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
+          send(key, value, logicErrors, messageSender);
+        }
+        send(key, calculateMessage, logicErrors, messageSender);
+
+        state.get(seeräuber).add(new AdderResult(number, (number + 1) * number / 2));
+      }
 
-            if (counter == 77)
-            {
-              if (logicErrors)
-              {
-                value = value.equals(startMessage) ? endMessage : startMessage;
-              }
-            }
+      return counter;
+    }
 
-            messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
-          }
+    void send(
+        Bytes key,
+        Bytes value,
+        boolean logicErrors,
+        Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
+    {
+      counter++;
 
-          @Override
-          public boolean canGeneratePoisonPill()
+      if (counter == 77)
+      {
+        if (logicErrors)
+        {
+          value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
+        }
+      }
+
+      messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
+    }
+
+    @Override
+    public boolean canGeneratePoisonPill()
+    {
+      return false;
+    }
+
+    @Override
+    public void assertBusinessLogic()
+    {
+      tests.results
+          .getState()
+          .values()
+          .stream()
+          .flatMap(map -> map.entrySet().stream())
+          .forEach(entry ->
           {
-            return false;
-          }
-        });
+            String user = entry.getKey();
+            List<AdderResult> resultsForUser = entry.getValue();
+
+            assertThat(state.get(user))
+                .describedAs("Unexpected results for user {}", user)
+                .containsExactlyElementsOf(resultsForUser);
+          });
+    }
   }
 }