</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
- <version>1.2.15</version>
+ <version>1.3.0</version>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName());
+ propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
propertyMap.put(
JsonDeserializer.TYPE_MAPPINGS,
+ "user:" + User.class.getName() + "," +
"word:" + Word.class.getName() + "," +
"counter:" + WordCounter.class.getName());
package de.juplo.kafka.wordcount.counter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
{
StreamsBuilder builder = new StreamsBuilder();
- KStream<String, Word> source = builder.stream(
- inputTopic,
- Consumed.with(Serdes.String(), null));
+ KStream<User, Word> source = builder.stream(inputTopic);
source
.map((key, word) -> new KeyValue<>(word, word))
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class User
+{
+ String user;
+}
package de.juplo.kafka.wordcount.counter;
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
import de.juplo.kafka.wordcount.splitter.TestInputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
@SpringBootTest(
properties = {
+ "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.properties.spring.json.add.type.headers=false",
"spring.kafka.consumer.auto-offset-reset=earliest",
@BeforeAll
public static void testSendMessage(
- @Autowired KafkaTemplate<String, TestInputWord> kafkaTemplate)
+ @Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
{
TestData
.getInputMessages()
{
try
{
- SendResult<String, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
log.info(
"Sent: {}={}, partition={}, offset={}",
result.getProducerRecord().key(),
package de.juplo.kafka.wordcount.counter;
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
import de.juplo.kafka.wordcount.splitter.TestInputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
TopologyTestDriver testDriver;
- TestInputTopic<String, TestInputWord> in;
+ TestInputTopic<TestInputUser, TestInputWord> in;
TestOutputTopic<TestOutputWord, TestOutputWordCounter> out;
in = testDriver.createInputTopic(
IN,
- new StringSerializer(),
+ new JsonSerializer().noTypeInfo(),
new JsonSerializer().noTypeInfo());
out = testDriver.createOutputTopic(
package de.juplo.kafka.wordcount.counter;
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
import de.juplo.kafka.wordcount.splitter.TestInputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
- private static final KeyValue<String, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
+ private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
{
new KeyValue<>(
- PETER,
+ TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_HALLO)),
new KeyValue<>(
- KLAUS,
+ TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_MÜSCH)),
new KeyValue<>(
- PETER,
+ TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_WELT)),
new KeyValue<>(
- KLAUS,
+ TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_MÜSCH)),
new KeyValue<>(
- KLAUS,
+ TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
new KeyValue<>(
- PETER,
+ TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
new KeyValue<>(
- PETER,
+ TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_WELT)),
new KeyValue<>(
- PETER,
+ TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
new KeyValue<>(
- KLAUS,
+ TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
new KeyValue<>(
- PETER,
+ TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
new KeyValue<>(
- KLAUS,
+ TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
};
- static Stream<KeyValue<String, TestInputWord>> getInputMessages()
+ static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
{
return Stream.of(TestData.INPUT_MESSAGES);
}
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestInputUser
+{
+ String user;
+}