`ApplicationTest` auf basis der typisierbaren Basis neu implementiert
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTest.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.producer.ProducerRecord;
4 import org.apache.kafka.common.serialization.LongSerializer;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.apache.kafka.common.utils.Bytes;
7
8 import java.util.Set;
9 import java.util.function.Consumer;
10
11
12 public class ApplicationTest extends GenericApplicationTest<String, Long>
13 {
14   public ApplicationTest()
15   {
16     super(
17         new StringSerializer(),
18         new RecordGenerator<> ()
19         {
20           final StringSerializer stringSerializer = new StringSerializer();
21           final LongSerializer longSerializer = new LongSerializer();
22
23
24           @Override
25           public void generate(
26               int numberOfMessagesToGenerate,
27               Set<Integer> poisonPills,
28               Consumer<ProducerRecord<String, Bytes>> messageSender)
29           {
30             int i = 0;
31
32             for (int partition = 0; partition < 10; partition++)
33             {
34               for (int key = 0; key < 10; key++)
35               {
36                 if (++i > numberOfMessagesToGenerate)
37                   return;
38
39                 Bytes value =
40                     poisonPills.contains(i)
41                         ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
42                         : new Bytes(longSerializer.serialize(TOPIC, (long)i));
43
44                 ProducerRecord<String, Bytes> record =
45                     new ProducerRecord<>(
46                         TOPIC,
47                         partition,
48                         Integer.toString(partition*10+key%2),
49                         value);
50
51                 messageSender.accept(record);
52               }
53             }
54           }
55         });
56   }
57 }