Tests aus gemerged springified-consumer--serialization -> deserialization
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTest.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.LongSerializer;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.apache.kafka.common.utils.Bytes;
8 import org.springframework.boot.test.context.TestConfiguration;
9 import org.springframework.context.annotation.Bean;
10 import org.springframework.context.annotation.Primary;
11 import org.springframework.test.context.ContextConfiguration;
12
13 import java.util.Set;
14 import java.util.function.Consumer;
15
16
17 @ContextConfiguration(classes = ApplicationTest.Configuration.class)
18 public class ApplicationTest extends GenericApplicationTest<String, Long>
19 {
20   public ApplicationTest()
21   {
22     super(
23         new RecordGenerator()
24         {
25           final StringSerializer stringSerializer = new StringSerializer();
26           final LongSerializer longSerializer = new LongSerializer();
27
28
29           @Override
30           public void generate(
31               int numberOfMessagesToGenerate,
32               Set<Integer> poisonPills,
33               Set<Integer> logicErrors,
34               Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
35           {
36             int i = 0;
37
38             for (int partition = 0; partition < 10; partition++)
39             {
40               for (int key = 0; key < 10; key++)
41               {
42                 if (++i > numberOfMessagesToGenerate)
43                   return;
44
45                 Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
46                 if (logicErrors.contains(i))
47                 {
48                   value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
49                 }
50                 if (poisonPills.contains(i))
51                 {
52                   value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
53                 }
54
55                 ProducerRecord<Bytes, Bytes> record =
56                     new ProducerRecord<>(
57                         TOPIC,
58                         partition,
59                         new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
60                         value);
61
62                 messageSender.accept(record);
63               }
64             }
65           }
66         });
67   }
68
69
70   @TestConfiguration
71   public static class Configuration
72   {
73     @Primary
74     @Bean
75     public Consumer<ConsumerRecord<String, Long>> consumer()
76     {
77       return (record) ->
78       {
79         if (record.value() == Long.MIN_VALUE)
80           throw new RuntimeException("BOOM (Logic-Error)!");
81       };
82     }
83   }
84 }