* Changed the type-mapping for `Word` from `word` to `key`.
* Refined the class `Word`, that defines the JSON for the output key.
** Added attribute `type` with fixed value `POPULAR`.
** Renamed attribute `user` to `channel`.
** Renamed attribute `word` to `key`.
* Refined the class `WordCounter`, that defines the JSON for the output
value.
** Renamed attribute `word` to `key`.
* Adapted test-classes and -cases accordingly.
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
- <version>1.3.1</version>
+ <version>1.4.0</version>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
@Slf4j
public class CounterStreamProcessor
{
+ public static final String TYPE = "COUNTER";
public static final String STORE_NAME = "counter";
builder
.stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
+ .mapValues(word -> Word.of(word.getUser(), word.getWord()))
.map((key, word) -> new KeyValue<>(word, word))
.groupByKey()
.count(
return new JsonSerde<>(User.class);
}
- public static JsonSerde<Word> inValueSerde()
+ public static JsonSerde<UserWord> inValueSerde()
{
- return new JsonSerde<>(Word.class);
+ return new JsonSerde<>(UserWord.class);
}
public static JsonSerde<Word> outKeySerde()
return typeMappingsConfig(Word.class, WordCounter.class);
}
- public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
+ public static String typeMappingsConfig(Class keyClass, Class counterClass)
{
return Map.of(
- "word", wordClass,
- "counter", wordCounterClass)
+ "key", keyClass,
+ "counter", counterClass)
.entrySet()
.stream()
.map(entry -> entry.getKey() + ":" + entry.getValue().getName())
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UserWord
+{
+ private String user;
+ private String word;
+}
package de.juplo.kafka.wordcount.counter;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
public class Word
{
- private String user;
- private String word;
+ private final String type = CounterStreamProcessor.TYPE;
+ private String channel;
+ private String key;
}
public class WordCounter
{
String user;
- String word;
+ String key;
long counter;
public static WordCounter of(Word word, long counter)
{
- return new WordCounter(word.getUser(), word.getWord(), counter);
+ return new WordCounter(word.getChannel(), word.getKey(), counter);
}
}
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
+ "spring.kafka.consumer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
import java.util.stream.Stream;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.TYPE;
import static org.assertj.core.api.Assertions.assertThat;
static final String WORD_S = "s";
static final String WORD_BOÄH = "Boäh";
- static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
- static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
- static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
- static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
- static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
+ static final TestOutputWord PETER_HALLO = TestOutputWord.of(TYPE, PETER, WORD_HALLO);
+ static final TestOutputWord PETER_WELT = TestOutputWord.of(TYPE, PETER, WORD_WELT);
+ static final TestOutputWord PETER_BOÄH = TestOutputWord.of(TYPE, PETER, WORD_BOÄH);
+ static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(TYPE, KLAUS, WORD_MÜSCH);
+ static final TestOutputWord KLAUS_S = TestOutputWord.of(TYPE, KLAUS, WORD_S);
private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
{
private static Word wordOf(TestOutputWord testOutputWord)
{
- Word word = new Word();
-
- word.setUser(testOutputWord.getUser());
- word.setWord(testOutputWord.getWord());
-
- return word;
+ return Word.of(
+ testOutputWord.getChannel(),
+ testOutputWord.getKey());
}
static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
Long counter)
{
TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
- word.getUser(),
- word.getWord(),
+ word.getChannel(),
+ word.getKey(),
counter);
assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
}
@AllArgsConstructor(staticName = "of")
public class TestOutputWord
{
- String user;
- String word;
+ String type;
+ String channel;
+ String key;
}
public class TestOutputWordCounter
{
String user;
- String word;
+ String key;
long counter;
}