Signatur und Handling des `RecordGenerator` vereinfacht/überarbeitet
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.LongSerializer;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.apache.kafka.common.utils.Bytes;
8 import org.springframework.boot.test.context.TestConfiguration;
9 import org.springframework.context.annotation.Bean;
10 import org.springframework.context.annotation.Primary;
11 import org.springframework.test.context.ContextConfiguration;
12
13 import java.util.Set;
14 import java.util.function.Consumer;
15
16
17 @ContextConfiguration(classes = ApplicationTests.Configuration.class)
18 public class ApplicationTests extends GenericApplicationTests<String, Long>
19 {
20   public ApplicationTests()
21   {
22     super(
23         new RecordGenerator()
24         {
25           final StringSerializer stringSerializer = new StringSerializer();
26           final LongSerializer longSerializer = new LongSerializer();
27
28
29           @Override
30           public int generate(
31               boolean poisonPills,
32               boolean logicErrors,
33               Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
34           {
35             int i = 0;
36
37             for (int partition = 0; partition < 10; partition++)
38             {
39               for (int key = 0; key < 10; key++)
40               {
41                 i++;
42
43                 Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
44                 if (i == 77)
45                 {
46                   if (logicErrors)
47                   {
48                     value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
49                   }
50                   if (poisonPills)
51                   {
52                     value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
53                   }
54                 }
55
56                 ProducerRecord<Bytes, Bytes> record =
57                     new ProducerRecord<>(
58                         TOPIC,
59                         partition,
60                         new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
61                         value);
62
63                 messageSender.accept(record);
64               }
65             }
66
67             return i;
68           }
69         });
70   }
71
72
73   @TestConfiguration
74   public static class Configuration
75   {
76     @Primary
77     @Bean
78     public Consumer<ConsumerRecord<String, Long>> consumer()
79     {
80       return (record) ->
81       {
82         if (record.value() == Long.MIN_VALUE)
83           throw new RuntimeException("BOOM (Logic-Error)!");
84       };
85     }
86   }
87 }