counter: 1.2.14 - Set up type-mappings for JSON-Deserialization counter counter-1.2.14
authorKai Moritz <kai@juplo.de>
Tue, 14 May 2024 20:25:51 +0000 (22:25 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 14 May 2024 20:40:46 +0000 (22:40 +0200)
Dockerfile
pom.xml
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/counter/Message.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java

index d2218b8..2e032f3 100644 (file)
@@ -1,4 +1,4 @@
-FROM openjdk:11-jre-slim
+FROM eclipse-temurin:17-jre
 COPY target/*.jar /opt/app.jar
 EXPOSE 8083
 ENTRYPOINT ["java", "-jar", "/opt/app.jar"]
diff --git a/pom.xml b/pom.xml
index cbccc73..3c5e9eb 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -5,12 +5,12 @@
        <parent>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
-               <version>3.0.2</version>
+               <version>3.2.5</version>
                <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>counter</artifactId>
-       <version>1.2.3</version>
+       <version>1.2.14</version>
        <name>Wordcount-Counter</name>
        <description>Word-counting stream-processor of the multi-user wordcount-example</description>
        <properties>
index 1bcc834..926045c 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka.wordcount.counter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.streams.StreamsConfig;
@@ -13,7 +12,6 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
-import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
@@ -27,26 +25,27 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St
 public class CounterApplicationConfiguriation
 {
        @Bean
-       public Properties propertyMap(CounterApplicationProperties properties)
+       public Properties streamProcessorProperties(CounterApplicationProperties counterProperties)
        {
                Properties propertyMap = new Properties();
 
-               propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
-               propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+               propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId());
+               propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer());
                propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, Word.class.getPackageName());
+               propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
                propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName());
                propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
                propertyMap.put(
                                JsonDeserializer.TYPE_MAPPINGS,
-                               "W:" + Word.class.getName() + "," +
-                               "C:" + WordCount.class.getName());
+                               "word:" + Word.class.getName() + "," +
+                               "counter:" + WordCounter.class.getName());
+               propertyMap.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE);
                propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
-               if (properties.getCommitInterval() != null)
-                       propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
-               if (properties.getCacheMaxBytes() != null)
-                       propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes());
+               if (counterProperties.getCommitInterval() != null)
+                       propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
+               if (counterProperties.getCacheMaxBytes() != null)
+                       propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, counterProperties.getCacheMaxBytes());
                propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
                return propertyMap;
@@ -54,18 +53,16 @@ public class CounterApplicationConfiguriation
 
        @Bean(initMethod = "start", destroyMethod = "stop")
        public CounterStreamProcessor streamProcessor(
-                       CounterApplicationProperties properties,
-                       Properties propertyMap,
+                       CounterApplicationProperties applicationProperties,
+                       Properties streamProcessorProperties,
                        KeyValueBytesStoreSupplier storeSupplier,
-                       ObjectMapper objectMapper,
                        ConfigurableApplicationContext context)
        {
                CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
-                               properties.getInputTopic(),
-                               properties.getOutputTopic(),
-                               propertyMap,
-                               storeSupplier,
-                               objectMapper);
+                               applicationProperties.getInputTopic(),
+                               applicationProperties.getOutputTopic(),
+                               streamProcessorProperties,
+                               storeSupplier);
 
                streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
                {
index d64eb68..b1343a7 100644 (file)
@@ -1,15 +1,15 @@
 package de.juplo.kafka.wordcount.counter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.kstream.*;
 
 import java.util.Properties;
 
@@ -24,14 +24,12 @@ public class CounterStreamProcessor
                        String inputTopic,
                        String outputTopic,
                        Properties properties,
-                       KeyValueBytesStoreSupplier storeSupplier,
-                       ObjectMapper mapper)
+                       KeyValueBytesStoreSupplier storeSupplier)
        {
                Topology topology = CounterStreamProcessor.buildTopology(
                                inputTopic,
                                outputTopic,
-                               storeSupplier,
-                               mapper);
+                               storeSupplier);
 
                streams = new KafkaStreams(topology, properties);
        }
@@ -39,8 +37,7 @@ public class CounterStreamProcessor
        static Topology buildTopology(
                        String inputTopic,
                        String outputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier,
-                       ObjectMapper mapper)
+                       KeyValueBytesStoreSupplier storeSupplier)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
@@ -53,7 +50,7 @@ public class CounterStreamProcessor
                                .groupByKey()
                                .count(Materialized.as(storeSupplier))
                                .toStream()
-                               .map((word, count) -> new KeyValue<>(word, WordCount.of(word, count)))
+                               .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
                                .to(outputTopic);
 
                Topology topology = builder.build();
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java
deleted file mode 100644 (file)
index 0767dd1..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.Value;
-
-
-@Value(staticConstructor = "of")
-public class WordCount
-{
-  String user;
-  String word;
-  long count;
-
-  public static WordCount of(Word word, long count)
-  {
-    return new WordCount(word.getUser(), word.getWord(), count);
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java
new file mode 100644 (file)
index 0000000..1334e5b
--- /dev/null
@@ -0,0 +1,21 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class WordCounter
+{
+  String user;
+  String word;
+  long counter;
+
+  public static WordCounter of(Word word, long counter)
+  {
+    return new WordCounter(word.getUser(), word.getWord(), counter);
+  }
+}
index b412fe4..78d103c 100644 (file)
@@ -2,7 +2,10 @@ package de.juplo.kafka.wordcount.counter;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.BeforeEach;
@@ -13,15 +16,24 @@ import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.*;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 
 import java.time.Duration;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.*;
-import static org.awaitility.Awaitility.*;
+import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
+import static org.awaitility.Awaitility.await;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.*;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
 
 
 @SpringBootTest(
@@ -41,7 +53,7 @@ public class CounterApplicationIT
        static final int PARTITIONS = 2;
 
        @Autowired
-       KafkaTemplate<String, String> kafkaTemplate;
+       KafkaTemplate<String, Word> kafkaTemplate;
        @Autowired
        Consumer consumer;
 
@@ -67,16 +79,21 @@ public class CounterApplicationIT
        @RequiredArgsConstructor
        static class Consumer
        {
-               private final List<Message> received = new LinkedList<>();
+               private final List<KeyValue<Word, WordCounter>> received = new LinkedList<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
-               public synchronized void receive(ConsumerRecord<String, String> record)
+               public synchronized void receive(ConsumerRecord<Word, WordCounter> record)
                {
-                       log.debug("Received message: {}", record);
-                       received.add(Message.of(record.key(),record.value()));
+                       log.debug(
+                                       "Received message: {} -> {}, key: {}, value: {}",
+                                       record.key(),
+                                       record.value(),
+                                       parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
+                                       parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
+                       received.add(KeyValue.pair(record.key(),record.value()));
                }
 
-               synchronized List<Message> getReceivedMessages()
+               synchronized List<KeyValue<Word, WordCounter>> getReceivedMessages()
                {
                        return received;
                }
@@ -85,6 +102,45 @@ public class CounterApplicationIT
        @TestConfiguration
        static class Configuration
        {
+               @Bean
+               ProducerFactory<?, ?> producerFactory(Properties streamProcessorProperties)
+               {
+                       Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+
+                       propertyMap.put(
+                                       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                                       JsonSerializer.class.getName());
+                       propertyMap.put(
+                                       ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                                       JsonSerializer.class.getName());
+
+                       return new DefaultKafkaProducerFactory<>(propertyMap);
+               }
+
+               @Bean
+               ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
+                               Properties streamProcessorProperties)
+               {
+                       Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+
+                       propertyMap.put(
+                                       ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                                       JsonDeserializer.class.getName());
+                       propertyMap.put(
+                                       ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                                       JsonDeserializer.class.getName());
+
+                       ConsumerFactory<? super Object, ? super Object> consumerFactory =
+                                       new DefaultKafkaConsumerFactory<>(propertyMap);
+
+                       ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory =
+                                       new ConcurrentKafkaListenerContainerFactory<>();
+
+                       kafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
+
+                       return kafkaListenerContainerFactory;
+               }
+
                @Bean
                Consumer consumer()
                {
index c2ada6f..ca2664e 100644 (file)
@@ -1,15 +1,24 @@
 package de.juplo.kafka.wordcount.counter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.serialization.*;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
+import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
 
+
+@Slf4j
 public class CounterStreamProcessorTopologyTest
 {
   public final static String IN = "TEST-IN";
@@ -21,32 +30,46 @@ public class CounterStreamProcessorTopologyTest
     Topology topology = CounterStreamProcessor.buildTopology(
         IN,
         OUT,
-        Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"),
-        new ObjectMapper());
+        Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
 
-    CounterApplicationConfiguriation config =
+    CounterApplicationConfiguriation applicationConfiguriation =
         new CounterApplicationConfiguriation();
-    Properties properties =
-        config.propertyMap(new CounterApplicationProperties());
+    Properties streamProcessorProperties =
+        applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
+    Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+
+    JsonSerde<?> keySerde = new JsonSerde<>();
+    keySerde.configure(propertyMap, true);
+    JsonSerde<?> valueSerde = new JsonSerde<>();
+    valueSerde.configure(propertyMap, false);
 
-    TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
+    TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
 
-    TestInputTopic<String, String> in = testDriver.createInputTopic(
+    TestInputTopic<String, Word> in = testDriver.createInputTopic(
         IN,
-        new StringSerializer(),
-        new StringSerializer());
+        (JsonSerializer<String>)keySerde.serializer(),
+        (JsonSerializer<Word>)valueSerde.serializer());
 
-    TestOutputTopic<String, String> out = testDriver.createOutputTopic(
+    TestOutputTopic<Word, WordCounter> out = testDriver.createOutputTopic(
         OUT,
-        new StringDeserializer(),
-        new StringDeserializer());
+        (JsonDeserializer<Word>)keySerde.deserializer(),
+        (JsonDeserializer<WordCounter>)valueSerde.deserializer());
 
     TestData.writeInputData((key, value) -> in.pipeInput(key, value));
 
-    List<Message> receivedMessages = out
+    List<KeyValue<Word, WordCounter>> receivedMessages = out
         .readRecordsToList()
         .stream()
-        .map(record -> Message.of(record.key(), record.value()))
+        .map(record ->
+        {
+          log.debug(
+              "OUT: {} -> {}, {}, {}",
+              record.key(),
+              record.value(),
+              parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
+              parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
+          return KeyValue.pair(record.key(), record.value());
+        })
         .toList();
 
     TestData.assertExpectedResult(receivedMessages);
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/Message.java b/src/test/java/de/juplo/kafka/wordcount/counter/Message.java
deleted file mode 100644 (file)
index 8ed7cc5..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.Value;
-
-
-@Value(staticConstructor = "of")
-public class Message
-{
-  String key;
-  String value;
-}
index 8ff7022..19443ac 100644 (file)
@@ -1,51 +1,58 @@
 package de.juplo.kafka.wordcount.counter;
 
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.KeyValue;
+
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 
 class TestData
 {
-       static void writeInputData(BiConsumer<String, String> consumer)
+       static void writeInputData(BiConsumer<String, Word> consumer)
        {
                consumer.accept(
                                "peter",
-                               "{\"user\":\"peter\",\"word\":\"Hallo\"}");
+                               Word.of("peter","Hallo"));
                consumer.accept(
                                "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"Müsch\"}");
+                               Word.of("klaus","Müsch"));
                consumer.accept(
                                "peter",
-                               "{\"user\":\"peter\",\"word\":\"Welt\"}");
+                               Word.of("peter","Welt"));
                consumer.accept(
                                "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"Müsch\"}");
+                               Word.of("klaus","Müsch"));
                consumer.accept(
                                "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"s\"}");
+                               Word.of("klaus","s"));
                consumer.accept(
                                "peter",
-                               "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+                               Word.of("peter","Boäh"));
                consumer.accept(
                                "peter",
-                               "{\"user\":\"peter\",\"word\":\"Welt\"}");
+                               Word.of("peter","Welt"));
                consumer.accept(
                                "peter",
-                               "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+                               Word.of("peter","Boäh"));
                consumer.accept(
                                "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"s\"}");
+                               Word.of("klaus","s"));
                consumer.accept(
                                "peter",
-                               "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+                               Word.of("peter","Boäh"));
                consumer.accept(
                                "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"s\"}");
+                               Word.of("klaus","s"));
        }
 
-       static void assertExpectedResult(List<Message> receivedMessages)
+       static void assertExpectedResult(List<KeyValue<Word, WordCounter>> receivedMessages)
        {
                assertThat(receivedMessages).hasSize(11);
                assertThat(receivedMessages).containsSubsequence(
@@ -66,40 +73,65 @@ class TestData
                                expectedMessages[9]); // Boäh
        }
 
-       static Message[] expectedMessages =
+       static KeyValue<Word, WordCounter>[] expectedMessages = new KeyValue[]
        {
-                       Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Hallo\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Hallo\",\"count\":1}"),
-                       Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"Müsch\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"Müsch\",\"count\":1}"),
-                       Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Welt\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Welt\",\"count\":1}"),
-                       Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"Müsch\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"Müsch\",\"count\":2}"),
-                       Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"s\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"s\",\"count\":1}"),
-                       Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":1}"),
-                       Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Welt\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Welt\",\"count\":2}"),
-                       Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":2}"),
-                       Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"s\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"s\",\"count\":2}"),
-                       Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":3}"),
-                       Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"s\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"s\",\"count\":3}"),
+                       KeyValue.pair(
+                                       Word.of("peter","Hallo"),
+                                       WordCounter.of("peter","Hallo",1)),
+                       KeyValue.pair(
+                                       Word.of("klaus","Müsch"),
+                                       WordCounter.of("klaus","Müsch",1)),
+                       KeyValue.pair(
+                                       Word.of("peter","Welt"),
+                                       WordCounter.of("peter","Welt",1)),
+                       KeyValue.pair(
+                                       Word.of("klaus","Müsch"),
+                                       WordCounter.of("klaus","Müsch",2)),
+                       KeyValue.pair(
+                                       Word.of("klaus","s"),
+                                       WordCounter.of("klaus","s",1)),
+                       KeyValue.pair(
+                                       Word.of("peter","Boäh"),
+                                       WordCounter.of("peter","Boäh",1)),
+                       KeyValue.pair(
+                                       Word.of("peter","Welt"),
+                                       WordCounter.of("peter","Welt",2)),
+                       KeyValue.pair(
+                                       Word.of("peter","Boäh"),
+                                       WordCounter.of("peter","Boäh",2)),
+                       KeyValue.pair(
+                                       Word.of("klaus","s"),
+                                       WordCounter.of("klaus","s",2)),
+                       KeyValue.pair(
+                                       Word.of("peter","Boäh"),
+                                       WordCounter.of("peter","Boäh",3)),
+                       KeyValue.pair(
+                                       Word.of("klaus","s"),
+                                       WordCounter.of("klaus","s",3)),
        };
+
+       static Map<String, Object> convertToMap(Properties properties)
+       {
+               return properties
+                               .entrySet()
+                               .stream()
+                               .collect(
+                                               Collectors.toMap(
+                                                               entry -> (String)entry.getKey(),
+                                                               entry -> entry.getValue()
+                                               ));
+       }
+
+       static String parseHeader(Headers headers, String key)
+       {
+               Header header = headers.lastHeader(key);
+               if (header == null)
+               {
+                       return key + "=null";
+               }
+               else
+               {
+                       return key + "=" + new String(header.value());
+               }
+       }
 }