Anzahl der erzeugten Test-Nachrichten wird vom `RecordGenerator` bestimmt
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.LongSerializer;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.apache.kafka.common.utils.Bytes;
8 import org.springframework.boot.test.context.TestConfiguration;
9 import org.springframework.context.annotation.Bean;
10 import org.springframework.context.annotation.Primary;
11 import org.springframework.test.context.ContextConfiguration;
12
13 import java.util.Set;
14 import java.util.function.Consumer;
15
16
17 @ContextConfiguration(classes = ApplicationTests.Configuration.class)
18 public class ApplicationTests extends GenericApplicationTests<String, Long>
19 {
20   public ApplicationTests()
21   {
22     super(
23         new RecordGenerator()
24         {
25           final StringSerializer stringSerializer = new StringSerializer();
26           final LongSerializer longSerializer = new LongSerializer();
27
28
29           @Override
30           public void generate(
31               Set<Integer> poisonPills,
32               Set<Integer> logicErrors,
33               Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
34           {
35             int i = 0;
36
37             for (int partition = 0; partition < 10; partition++)
38             {
39               for (int key = 0; key < 10; key++)
40               {
41                 if (++i > 100)
42                   return;
43
44                 Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
45                 if (logicErrors.contains(i))
46                 {
47                   value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
48                 }
49                 if (poisonPills.contains(i))
50                 {
51                   value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
52                 }
53
54                 ProducerRecord<Bytes, Bytes> record =
55                     new ProducerRecord<>(
56                         TOPIC,
57                         partition,
58                         new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
59                         value);
60
61                 messageSender.accept(record);
62               }
63             }
64           }
65         });
66   }
67
68
69   @TestConfiguration
70   public static class Configuration
71   {
72     @Primary
73     @Bean
74     public Consumer<ConsumerRecord<String, Long>> consumer()
75     {
76       return (record) ->
77       {
78         if (record.value() == Long.MIN_VALUE)
79           throw new RuntimeException("BOOM (Logic-Error)!");
80       };
81     }
82   }
83 }