Verbesserte Tests und Korrekturen gemerged: sumup-adder -> stored-offsets
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
index 4ddf8a9..5166227 100644 (file)
@@ -1,31 +1,28 @@
 package de.juplo.kafka;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.springframework.test.context.ContextConfiguration;
 
 import java.util.function.Consumer;
-import java.util.stream.IntStream;
 
 
-public class ApplicationTests extends GenericApplicationTests<String, String>
+@ContextConfiguration(classes = ApplicationTests.Configuration.class)
+public class ApplicationTests extends GenericApplicationTests<String, Long>
 {
   public ApplicationTests()
   {
     super(
         new RecordGenerator()
         {
-          final int[] numbers = { 1, 7, 3, 2, 33, 6, 11 };
-          final String[] dieWilden13 =
-              IntStream
-                  .range(1,14)
-                  .mapToObj(i -> "seeräuber-" + i)
-                  .toArray(i -> new String[i]);
           final StringSerializer stringSerializer = new StringSerializer();
-          final Bytes startMessage = new Bytes(stringSerializer.serialize(TOPIC, "START"));
-          final Bytes endMessage = new Bytes(stringSerializer.serialize(TOPIC, "END"));
-
-          int counter = 0;
+          final LongSerializer longSerializer = new LongSerializer();
 
 
           @Override
@@ -34,51 +31,62 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
               boolean logicErrors,
               Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
           {
-            counter = 0;
+            int i = 0;
 
-            for (int i = 0; i < 33; i++)
+            for (int partition = 0; partition < 10; partition++)
             {
-              String seeräuber = dieWilden13[i%13];
-              int number = numbers[i%7];
-
-              Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
-
-              send(key, startMessage, logicErrors, messageSender);
-              for (int message = 1; message <= number; message++)
+              for (int key = 0; key < 10; key++)
               {
-                Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
-                send(key, value, logicErrors, messageSender);
-              }
-              send(key, endMessage, logicErrors, messageSender);
-            }
+                i++;
 
-            return counter;
-          }
+                Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
+                if (i == 77)
+                {
+                  if (logicErrors)
+                  {
+                    value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
+                  }
+                  if (poisonPills)
+                  {
+                    value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
+                  }
+                }
 
-          void send(
-              Bytes key,
-              Bytes value,
-              boolean logicErrors,
-              Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
-          {
-            counter++;
+                ProducerRecord<Bytes, Bytes> record =
+                    new ProducerRecord<>(
+                        TOPIC,
+                        partition,
+                        new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
+                        value);
 
-            if (counter == 77)
-            {
-              if (logicErrors)
-              {
-                value = value.equals(startMessage) ? endMessage : startMessage;
+                messageSender.accept(record);
               }
             }
 
-            messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
-          }
-
-          @Override
-          public boolean canGeneratePoisonPill()
-          {
-            return false;
+            return i;
           }
         });
   }
+
+
+  @TestConfiguration
+  public static class Configuration
+  {
+    @Primary
+    @Bean
+    public ApplicationRecordHandler recordHandler()
+    {
+      ApplicationRecordHandler recordHandler = new ApplicationRecordHandler();
+      return new ApplicationRecordHandler()
+      {
+        @Override
+        public void accept(ConsumerRecord<String, Long> record)
+        {
+          if (record.value() == Long.MIN_VALUE)
+            throw new RuntimeException("BOOM (Logic-Error)!");
+          super.accept(record);
+        }
+      };
+    }
+  }
 }