counter: 1.2.8 - Reconfigured tests to send data as domain-instances
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterApplicationIT.java
index b412fe4..559b171 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.counter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.BeforeEach;
@@ -13,12 +14,18 @@ import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 
 import java.time.Duration;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
 
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.*;
 import static org.awaitility.Awaitility.*;
@@ -41,7 +48,7 @@ public class CounterApplicationIT
        static final int PARTITIONS = 2;
 
        @Autowired
-       KafkaTemplate<String, String> kafkaTemplate;
+       KafkaTemplate<String, Word> kafkaTemplate;
        @Autowired
        Consumer consumer;
 
@@ -85,6 +92,28 @@ public class CounterApplicationIT
        @TestConfiguration
        static class Configuration
        {
+               @Bean
+               ProducerFactory<?, ?> producerFactory(Properties streamProcessorProperties)
+               {
+                       Map<String, Object> propertyMap = streamProcessorProperties
+                                       .entrySet()
+                                       .stream()
+                                       .collect(
+                                                       Collectors.toMap(
+                                                                       entry -> (String)entry.getKey(),
+                                                                       entry -> entry.getValue()
+                                                       ));
+
+                       propertyMap.put(
+                                       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                                       JsonSerializer.class.getName());
+                       propertyMap.put(
+                                       ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                                       JsonSerializer.class.getName());
+
+                       return new DefaultKafkaProducerFactory<>(propertyMap);
+               }
+
                @Bean
                Consumer consumer()
                {