ROT: Zur Summe soll die Zahl ausgegeben werden - Logik + Test angepasst
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
index 8369a7b..9dda079 100644 (file)
@@ -1,29 +1,30 @@
 package de.juplo.kafka;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
-import org.springframework.test.context.ContextConfiguration;
 
-import java.util.Set;
 import java.util.function.Consumer;
+import java.util.stream.IntStream;
 
 
-@ContextConfiguration(classes = ApplicationTests.Configuration.class)
-public class ApplicationTests extends GenericApplicationTests<String, Long>
+public class ApplicationTests extends GenericApplicationTests<String, String>
 {
   public ApplicationTests()
   {
     super(
         new RecordGenerator()
         {
+          final int[] numbers = { 1, 7, 3, 2, 33, 6, 11 };
+          final String[] dieWilden13 =
+              IntStream
+                  .range(1,14)
+                  .mapToObj(i -> "seeräuber-" + i)
+                  .toArray(i -> new String[i]);
           final StringSerializer stringSerializer = new StringSerializer();
-          final LongSerializer longSerializer = new LongSerializer();
+          final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
+
+          int counter = 0;
 
 
           @Override
@@ -32,56 +33,50 @@ public class ApplicationTests extends GenericApplicationTests<String, Long>
               boolean logicErrors,
               Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
           {
-            int i = 0;
+            counter = 0;
 
-            for (int partition = 0; partition < 10; partition++)
+            for (int i = 0; i < 33; i++)
             {
-              for (int key = 0; key < 10; key++)
+              String seeräuber = dieWilden13[i%13];
+              int number = numbers[i%7];
+
+              Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
+
+              for (int message = 1; message <= number; message++)
               {
-                i++;
+                Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
+                send(key, value, logicErrors, messageSender);
+              }
+              send(key, calculateMessage, logicErrors, messageSender);
+            }
 
-                Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
-                if (i == 77)
-                {
-                  if (logicErrors)
-                  {
-                    value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
-                  }
-                  if (poisonPills)
-                  {
-                    value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
-                  }
-                }
+            return counter;
+          }
 
-                ProducerRecord<Bytes, Bytes> record =
-                    new ProducerRecord<>(
-                        TOPIC,
-                        partition,
-                        new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
-                        value);
+          void send(
+              Bytes key,
+              Bytes value,
+              boolean logicErrors,
+              Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
+          {
+            counter++;
 
-                messageSender.accept(record);
+            if (counter == 77)
+            {
+              if (logicErrors)
+              {
+                value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
               }
             }
 
-            return i;
+            messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
           }
-        });
-  }
 
-
-  @TestConfiguration
-  public static class Configuration
-  {
-    @Primary
-    @Bean
-    public Consumer<ConsumerRecord<String, Long>> consumer()
-    {
-      return (record) ->
-      {
-        if (record.value() == Long.MIN_VALUE)
-          throw new RuntimeException("BOOM (Logic-Error)!");
-      };
-    }
+          @Override
+          public boolean canGeneratePoisonPill()
+          {
+            return false;
+          }
+        });
   }
 }