Verbesserte Tests und Korrekturen gemerged: sumup-adder -> stored-offsets
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.LongSerializer;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.apache.kafka.common.utils.Bytes;
8 import org.springframework.boot.test.context.TestConfiguration;
9 import org.springframework.context.annotation.Bean;
10 import org.springframework.context.annotation.Primary;
11 import org.springframework.test.context.ContextConfiguration;
12
13 import java.util.function.Consumer;
14
15
16 @ContextConfiguration(classes = ApplicationTests.Configuration.class)
17 public class ApplicationTests extends GenericApplicationTests<String, Long>
18 {
19   public ApplicationTests()
20   {
21     super(
22         new RecordGenerator()
23         {
24           final StringSerializer stringSerializer = new StringSerializer();
25           final LongSerializer longSerializer = new LongSerializer();
26
27
28           @Override
29           public int generate(
30               boolean poisonPills,
31               boolean logicErrors,
32               Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
33           {
34             int i = 0;
35
36             for (int partition = 0; partition < 10; partition++)
37             {
38               for (int key = 0; key < 10; key++)
39               {
40                 i++;
41
42                 Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
43                 if (i == 77)
44                 {
45                   if (logicErrors)
46                   {
47                     value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
48                   }
49                   if (poisonPills)
50                   {
51                     value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
52                   }
53                 }
54
55                 ProducerRecord<Bytes, Bytes> record =
56                     new ProducerRecord<>(
57                         TOPIC,
58                         partition,
59                         new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
60                         value);
61
62                 messageSender.accept(record);
63               }
64             }
65
66             return i;
67           }
68         });
69   }
70
71
72   @TestConfiguration
73   public static class Configuration
74   {
75     @Primary
76     @Bean
77     public ApplicationRecordHandler recordHandler()
78     {
79       ApplicationRecordHandler recordHandler = new ApplicationRecordHandler();
80       return new ApplicationRecordHandler()
81       {
82         @Override
83         public void accept(ConsumerRecord<String, Long> record)
84         {
85           if (record.value() == Long.MIN_VALUE)
86             throw new RuntimeException("BOOM (Logic-Error)!");
87           super.accept(record);
88         }
89       };
90     }
91   }
92 }