Addder beendet sich bei Fehler und Logik für Beenden vereinfacht
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java
deleted file mode 100644 (file)
index bd9f449..0000000
+++ /dev/null
@@ -1,172 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.*;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-@Slf4j
-public class ApplicationTests extends GenericApplicationTests<String, Message>
-{
-  @Autowired
-  StateRepository stateRepository;
-
-
-  public ApplicationTests()
-  {
-    super(new ApplicationTestRecrodGenerator());
-    ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
-  }
-
-
-  static class ApplicationTestRecrodGenerator implements RecordGenerator
-  {
-    ApplicationTests tests;
-
-    final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
-    final String[] dieWilden13 =
-        IntStream
-            .range(1, 14)
-            .mapToObj(i -> "seeräuber-" + i)
-            .toArray(i -> new String[i]);
-    final StringSerializer stringSerializer = new StringSerializer();
-    final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
-
-    int counter = 0;
-
-    Map<String, List<AdderResult>> state;
-
-    @Override
-    public int generate(
-        boolean poisonPills,
-        boolean logicErrors,
-        Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
-    {
-      counter = 0;
-      state =
-          Arrays
-              .stream(dieWilden13)
-              .collect(Collectors.toMap(
-                  seeräuber -> seeräuber,
-                  seeräuber -> new LinkedList()));
-
-      int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
-      int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
-      int next = 0;
-
-      for (int pass = 0; pass < 333; pass++)
-      {
-        for (int i = 0; i<13; i++)
-        {
-          String seeräuber = dieWilden13[i];
-          Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
-
-          if (message[i] > number[i])
-          {
-            send(
-              key,
-              calculateMessage,
-              Message.Type.CALC,
-              poisonPill(poisonPills, pass, counter),
-              logicError(logicErrors, pass, counter),
-              messageSender);
-            state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
-            // Pick next number to calculate
-            number[i] = numbers[next++%numbers.length];
-            message[i] = 1;
-            log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
-          }
-
-          send(
-            key,
-            new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
-            Message.Type.ADD,
-            poisonPill(poisonPills, pass, counter),
-            logicError(logicErrors, pass, counter),
-            messageSender);
-        }
-      }
-
-      return counter;
-    }
-
-    boolean poisonPill (boolean poisonPills, int pass, int counter)
-    {
-      return poisonPills && pass > 300 && counter%99 == 0;
-    }
-
-    boolean logicError(boolean logicErrors, int pass, int counter)
-    {
-      return logicErrors && pass > 300 && counter%77 == 0;
-    }
-
-    void send(
-        Bytes key,
-        Bytes value,
-        Message.Type type,
-        boolean poisonPill,
-        boolean logicError,
-        Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
-    {
-      counter++;
-
-      if (logicError)
-      {
-        value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
-      }
-      if (poisonPill)
-      {
-        value = new Bytes("BOOM!".getBytes());
-      }
-
-      ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
-      record.headers().add("__TypeId__", type.toString().getBytes());
-      messageSender.accept(record);
-    }
-
-    @Override
-    public void assertBusinessLogic()
-    {
-      for (int i=0; i<PARTITIONS; i++)
-      {
-        StateDocument stateDocument =
-            tests.stateRepository.findById(Integer.toString(i)).get();
-
-        stateDocument
-            .results
-            .entrySet()
-            .stream()
-            .forEach(entry ->
-            {
-              String user = entry.getKey();
-              List<AdderResult> resultsForUser = entry.getValue();
-
-              for (int j=0; j < resultsForUser.size(); j++)
-              {
-                if (!(j < state.get(user).size()))
-                {
-                  break;
-                }
-
-                assertThat(resultsForUser.get(j))
-                    .as("Unexpected results calculation %d of user %s", j, user)
-                    .isEqualTo(state.get(user).get(j));
-              }
-
-              assertThat(state.get(user))
-                  .as("More results calculated for user %s as expected", user)
-                  .containsAll(resultsForUser);
-            });
-      }
-    }
-  }
-}