* Changed the type-mapping for `Word` from `word` to `key`.
* Refined the class `Word`, that defines the JSON for the output key.
** Added attribute `type` with fixed value `POPULAR`.
** Renamed attribute `user` to `channel`.
** Renamed attribute `word` to `key`.
* Refined the class `WordCounter`, that defines the JSON for the output
  value.
** Renamed attribute `word` to `key`.
* Adapted test-classes and -cases accordingly.
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>counter</artifactId>
-       <version>1.3.1</version>
+       <version>1.4.0</version>
        <name>Wordcount-Counter</name>
        <description>Word-counting stream-processor of the multi-user wordcount-example</description>
        <properties>
 
 @Slf4j
 public class CounterStreamProcessor
 {
+       public static final String TYPE = "COUNTER";
        public static final String STORE_NAME = "counter";
 
 
 
                builder
                                .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
+                               .mapValues(word -> Word.of(word.getUser(), word.getWord()))
                                .map((key, word) -> new KeyValue<>(word, word))
                                .groupByKey()
                                .count(
                return new JsonSerde<>(User.class);
        }
 
-       public static JsonSerde<Word> inValueSerde()
+       public static JsonSerde<UserWord> inValueSerde()
        {
-               return new JsonSerde<>(Word.class);
+               return new JsonSerde<>(UserWord.class);
        }
 
        public static JsonSerde<Word> outKeySerde()
                return typeMappingsConfig(Word.class, WordCounter.class);
        }
 
-       public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
+       public static String typeMappingsConfig(Class keyClass, Class counterClass)
        {
                return Map.of(
-                                               "word", wordClass,
-                                               "counter", wordCounterClass)
+                                               "key", keyClass,
+                                               "counter", counterClass)
                                .entrySet()
                                .stream()
                                .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
 
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UserWord
+{
+  private String user;
+  private String word;
+}
 
 package de.juplo.kafka.wordcount.counter;
 
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 
 @Data
-@JsonIgnoreProperties(ignoreUnknown = true)
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
 public class Word
 {
-  private String user;
-  private String word;
+  private final String type = CounterStreamProcessor.TYPE;
+  private String channel;
+  private String key;
 }
 
 public class WordCounter
 {
   String user;
-  String word;
+  String key;
   long counter;
 
   public static WordCounter of(Word word, long counter)
   {
-    return new WordCounter(word.getUser(), word.getWord(), counter);
+    return new WordCounter(word.getChannel(), word.getKey(), counter);
   }
 }
 
                                "spring.kafka.consumer.auto-offset-reset=earliest",
                                "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
+                               "spring.kafka.consumer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
 
 
 import java.util.stream.Stream;
 
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.TYPE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 
        static final String WORD_S = "s";
        static final String WORD_BOÄH = "Boäh";
 
-       static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
-       static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
-       static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
-       static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
-       static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
+       static final TestOutputWord PETER_HALLO = TestOutputWord.of(TYPE, PETER, WORD_HALLO);
+       static final TestOutputWord PETER_WELT = TestOutputWord.of(TYPE, PETER, WORD_WELT);
+       static final TestOutputWord PETER_BOÄH = TestOutputWord.of(TYPE, PETER, WORD_BOÄH);
+       static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(TYPE, KLAUS, WORD_MÜSCH);
+       static final TestOutputWord KLAUS_S = TestOutputWord.of(TYPE, KLAUS, WORD_S);
 
        private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
        {
 
        private static Word wordOf(TestOutputWord testOutputWord)
        {
-               Word word = new Word();
-
-               word.setUser(testOutputWord.getUser());
-               word.setWord(testOutputWord.getWord());
-
-               return word;
+               return Word.of(
+                               testOutputWord.getChannel(),
+                               testOutputWord.getKey());
        }
 
        static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
                        Long counter)
        {
                TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
-                               word.getUser(),
-                               word.getWord(),
+                               word.getChannel(),
+                               word.getKey(),
                                counter);
                assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
        }
 
 @AllArgsConstructor(staticName = "of")
 public class TestOutputWord
 {
-  String user;
-  String word;
+  String type;
+  String channel;
+  String key;
 }
 
 public class TestOutputWordCounter
 {
   String user;
-  String word;
+  String key;
   long counter;
 }