Springify: Merge der Umstellung des Payloads auf JSON
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
index 1d3546c..ee8e8c4 100644 (file)
@@ -9,7 +9,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.serialization.*;
 import org.apache.kafka.common.utils.Bytes;
-import org.assertj.core.api.OptionalAssert;
 import org.junit.jupiter.api.*;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
@@ -18,6 +17,7 @@ import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Import;
 import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.context.TestPropertySource;
 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
@@ -26,8 +26,8 @@ import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -63,6 +63,8 @@ class ApplicationTests
        @Autowired
        KafkaProducer<String, Bytes> kafkaProducer;
        @Autowired
+       KafkaConsumer<String, ClientMessage> kafkaConsumer;
+       @Autowired
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
        @Autowired
        ApplicationProperties properties;
@@ -73,7 +75,7 @@ class ApplicationTests
 
        Map<TopicPartition, Long> oldOffsets;
        Map<TopicPartition, Long> newOffsets;
-       Set<ConsumerRecord<String, Long>> receivedRecords;
+       Set<ConsumerRecord<String, ClientMessage>> receivedRecords;
 
 
        /** Tests methods */
@@ -82,7 +84,7 @@ class ApplicationTests
        @Order(1) // << The poistion pill is not skipped. Hence, this test must run first
        void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException
        {
-               send100Messages(i ->  new Bytes(valueSerializer.serialize(TOPIC, i)));
+               send100Messages((key, counter) -> serialize(key, counter));
 
                await("100 records received")
                                .atMost(Duration.ofSeconds(30))
@@ -105,10 +107,10 @@ class ApplicationTests
        @Order(2)
        void commitsOffsetOfErrorForReprocessingOnError()
        {
-               send100Messages(counter ->
+               send100Messages((key, counter) ->
                                counter == 77
                                                ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
-                                               : new Bytes(valueSerializer.serialize(TOPIC, counter)));
+                                               : serialize(key, counter));
 
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
@@ -122,9 +124,9 @@ class ApplicationTests
                assertThatNoException()
                                .describedAs("Consumer should not be running")
                                .isThrownBy(() -> endlessConsumer.exitStatus());
-               ((OptionalAssert<Exception>)assertThat(endlessConsumer.exitStatus()))
-                               .describedAs("Consumer should have exited abnormally")
-                               .containsInstanceOf(RecordDeserializationException.class);
+               assertThat(endlessConsumer.exitStatus())
+                               .containsInstanceOf(RecordDeserializationException.class)
+                               .describedAs("Consumer should have exited abnormally");
        }
 
 
@@ -181,7 +183,7 @@ class ApplicationTests
        }
 
 
-       void send100Messages(Function<Long, Bytes> messageGenerator)
+       void send100Messages(BiFunction<Integer, Long, Bytes> messageGenerator)
        {
                long i = 0;
 
@@ -189,7 +191,7 @@ class ApplicationTests
                {
                        for (int key = 0; key < 10; key++)
                        {
-                               Bytes value = messageGenerator.apply(++i);
+                               Bytes value = messageGenerator.apply(key, ++i);
 
                                ProducerRecord<String, Bytes> record =
                                                new ProducerRecord<>(
@@ -198,6 +200,7 @@ class ApplicationTests
                                                                Integer.toString(key%2),
                                                                value);
 
+                               record.headers().add("__TypeId__", "message".getBytes());
                                kafkaProducer.send(record, (metadata, e) ->
                                {
                                        if (metadata != null)
@@ -222,6 +225,14 @@ class ApplicationTests
                }
        }
 
+       Bytes serialize(Integer key, Long value)
+       {
+               ClientMessage message = new ClientMessage();
+               message.setClient(key.toString());
+               message.setMessage(value.toString());
+               return new Bytes(valueSerializer.serialize(TOPIC, message));
+       }
+
 
        @BeforeEach
        public void init()
@@ -263,14 +274,14 @@ class ApplicationTests
                }
        }
 
-       public static class RecordHandler implements Consumer<ConsumerRecord<String, Long>>
+       public static class RecordHandler implements Consumer<ConsumerRecord<String, ClientMessage>>
        {
-               Consumer<ConsumerRecord<String, Long>> captureOffsets;
-               Consumer<ConsumerRecord<String, Long>> testHandler;
+               Consumer<ConsumerRecord<String, ClientMessage>> captureOffsets;
+               Consumer<ConsumerRecord<String, ClientMessage>> testHandler;
 
 
                @Override
-               public void accept(ConsumerRecord<String, Long> record)
+               public void accept(ConsumerRecord<String, ClientMessage> record)
                {
                        captureOffsets
                                        .andThen(testHandler)
@@ -284,15 +295,15 @@ class ApplicationTests
        {
                @Primary
                @Bean
-               public Consumer<ConsumerRecord<String, Long>> testHandler()
+               public Consumer<ConsumerRecord<String, ClientMessage>> testHandler()
                {
                        return new RecordHandler();
                }
 
                @Bean
-               Serializer<Long> serializer()
+               Serializer<ClientMessage> serializer()
                {
-                       return new LongSerializer();
+                       return new JsonSerializer<>();
                }
 
                @Bean