counter: 1.2.9 - Reconfigured tests to receive data as domain-instances counter-1.2.9
authorKai Moritz <kai@juplo.de>
Sun, 12 May 2024 21:30:04 +0000 (23:30 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 14 May 2024 20:33:49 +0000 (22:33 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/counter/Message.java
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java

diff --git a/pom.xml b/pom.xml
index 0b134f3..a710d47 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>counter</artifactId>
-       <version>1.2.8</version>
+       <version>1.2.9</version>
        <name>Wordcount-Counter</name>
        <description>Word-counting stream-processor of the multi-user wordcount-example</description>
        <properties>
index 9785f69..f836f89 100644 (file)
@@ -12,7 +12,6 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
-import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
@@ -34,11 +33,9 @@ public class CounterApplicationConfiguriation
                propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer());
                propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               propertyMap.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
                propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
                propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName());
                propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
-               propertyMap.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
                propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
                if (counterProperties.getCommitInterval() != null)
                        propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
index 0767dd1..958f9b7 100644 (file)
@@ -1,9 +1,13 @@
 package de.juplo.kafka.wordcount.counter;
 
-import lombok.Value;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
 
-@Value(staticConstructor = "of")
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
 public class WordCount
 {
   String user;
index 559b171..158c6ae 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.counter;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
@@ -14,9 +15,9 @@ import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.*;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 
@@ -77,7 +78,7 @@ public class CounterApplicationIT
                private final List<Message> received = new LinkedList<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
-               public synchronized void receive(ConsumerRecord<String, String> record)
+               public synchronized void receive(ConsumerRecord<Word, WordCount> record)
                {
                        log.debug("Received message: {}", record);
                        received.add(Message.of(record.key(),record.value()));
@@ -114,6 +115,37 @@ public class CounterApplicationIT
                        return new DefaultKafkaProducerFactory<>(propertyMap);
                }
 
+               @Bean
+               ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
+                               Properties streamProcessorProperties)
+               {
+                       Map<String, Object> propertyMap = streamProcessorProperties
+                                       .entrySet()
+                                       .stream()
+                                       .collect(
+                                                       Collectors.toMap(
+                                                                       entry -> (String)entry.getKey(),
+                                                                       entry -> entry.getValue()
+                                                       ));
+
+                       propertyMap.put(
+                                       ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                                       JsonDeserializer.class.getName());
+                       propertyMap.put(
+                                       ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                                       JsonDeserializer.class.getName());
+
+                       ConsumerFactory<? super Object, ? super Object> consumerFactory =
+                                       new DefaultKafkaConsumerFactory<>(propertyMap);
+
+                       ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory =
+                                       new ConcurrentKafkaListenerContainerFactory<>();
+
+                       kafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
+
+                       return kafkaListenerContainerFactory;
+               }
+
                @Bean
                Consumer consumer()
                {
index 902e93f..42ca78b 100644 (file)
@@ -1,17 +1,19 @@
 package de.juplo.kafka.wordcount.counter;
 
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 
 public class CounterStreamProcessorTopologyTest
@@ -32,17 +34,30 @@ public class CounterStreamProcessorTopologyTest
     Properties streamProcessorProperties =
         applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
 
+    Map<String, ?> propertyMap = streamProcessorProperties
+        .entrySet()
+        .stream()
+        .collect(
+            Collectors.toMap(
+                entry -> (String)entry.getKey(),
+                entry -> entry.getValue()
+            ));
+    JsonSerde<?> keySerde = new JsonSerde<>();
+    keySerde.configure(propertyMap, true);
+    JsonSerde<?> valueSerde = new JsonSerde<>();
+    valueSerde.configure(propertyMap, false);
+
     TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
 
     TestInputTopic<String, Word> in = testDriver.createInputTopic(
         IN,
-        new JsonSerializer<>(),
-        new JsonSerializer<>());
+        (JsonSerializer<String>)keySerde.serializer(),
+        (JsonSerializer<Word>)valueSerde.serializer());
 
-    TestOutputTopic<String, String> out = testDriver.createOutputTopic(
+    TestOutputTopic<Word, WordCount> out = testDriver.createOutputTopic(
         OUT,
-        new StringDeserializer(),
-        new StringDeserializer());
+        (JsonDeserializer<Word>)keySerde.deserializer(),
+        (JsonDeserializer<WordCount>)valueSerde.deserializer());
 
     TestData.writeInputData((key, value) -> in.pipeInput(key, value));
 
index 8ed7cc5..eb64e6d 100644 (file)
@@ -6,6 +6,6 @@ import lombok.Value;
 @Value(staticConstructor = "of")
 public class Message
 {
-  String key;
-  String value;
+  Word key;
+  WordCount value;
 }
index f795e77..4a65a78 100644 (file)
@@ -69,37 +69,37 @@ class TestData
        static Message[] expectedMessages =
        {
                        Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Hallo\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Hallo\",\"count\":1}"),
+                                       Word.of("peter","Hallo"),
+                                       WordCount.of("peter","Hallo",1)),
                        Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"Müsch\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"Müsch\",\"count\":1}"),
+                                       Word.of("klaus","Müsch"),
+                                       WordCount.of("klaus","Müsch",1)),
                        Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Welt\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Welt\",\"count\":1}"),
+                                       Word.of("peter","Welt"),
+                                       WordCount.of("peter","Welt",1)),
                        Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"Müsch\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"Müsch\",\"count\":2}"),
+                                       Word.of("klaus","Müsch"),
+                                       WordCount.of("klaus","Müsch",2)),
                        Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"s\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"s\",\"count\":1}"),
+                                       Word.of("klaus","s"),
+                                       WordCount.of("klaus","s",1)),
                        Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":1}"),
+                                       Word.of("peter","Boäh"),
+                                       WordCount.of("peter","Boäh",1)),
                        Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Welt\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Welt\",\"count\":2}"),
+                                       Word.of("peter","Welt"),
+                                       WordCount.of("peter","Welt",2)),
                        Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":2}"),
+                                       Word.of("peter","Boäh"),
+                                       WordCount.of("peter","Boäh",2)),
                        Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"s\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"s\",\"count\":2}"),
+                                       Word.of("klaus","s"),
+                                       WordCount.of("klaus","s",2)),
                        Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":3}"),
+                                       Word.of("peter","Boäh"),
+                                       WordCount.of("peter","Boäh",3)),
                        Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"s\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"s\",\"count\":3}"),
+                                       Word.of("klaus","s"),
+                                       WordCount.of("klaus","s",3)),
        };
 }