import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
- propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+ propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, Key.class.getPackageName());
+ propertyMap.put(
+ JsonDeserializer.TYPE_MAPPINGS,
+ "W=" + Word.class.getName() + "," +
+ "K=" + Key.class.getName() + "," +
+ "C=" + WordCount.class.getName());
+ propertyMap.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true);
propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
if (properties.getCommitInterval() != null)
propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
package de.juplo.kafka.wordcount.counter;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.kstream.*;
+import org.apache.kafka.streams.state.Stores;
+import org.springframework.kafka.support.serializer.JsonSerde;
import java.util.Properties;
{
StreamsBuilder builder = new StreamsBuilder();
- KStream<String, String> source = builder.stream(inputTopic);
+ KStream<String, Word> source = builder.stream(
+ inputTopic,
+ Consumed.with(
+ Serdes.String(),
+ new JsonSerde<>(Word.class)
+ .ignoreTypeHeaders()));
+
source
- .map((username, word) ->
- {
- try
- {
- String key = mapper.writeValueAsString(Key.of(username, word));
- return new KeyValue<>(key, word);
- }
- catch (JsonProcessingException e)
- {
- throw new RuntimeException(e);
- }
- })
- .groupByKey()
- .count(Materialized.as(storeSupplier))
- .mapValues(value->Long.toString(value))
+ .map((key, word) -> new KeyValue<>(word, word))
+ .groupByKey(Grouped.with(
+ new JsonSerde<>(Word.class)
+ .forKeys()
+ .noTypeInfo(),
+ new JsonSerde<>(Word.class)
+ .noTypeInfo()))
+ .count(Materialized
+ .<Word,Long>as(storeSupplier)
+ .withKeySerde(
+ new JsonSerde<>(Word.class)
+ .forKeys()
+ .noTypeInfo())
+ .withValueSerde(
+ Serdes.Long()))
.toStream()
- .to(outputTopic);
+ .map((word, count) -> new KeyValue<>(word, WordCount.of(word.getUser(), word.getWord(), count)))
+ .to(
+ outputTopic,
+ Produced.with(
+ new JsonSerde<>(Word.class)
+ .forKeys()
+ .noTypeInfo(),
+ new JsonSerde<>(WordCount.class)
+ .noTypeInfo()));
+
+ Topology topology = builder.build();
+ log.info("\n\n{}", topology.describe());
- return builder.build();
+ return topology;
}
public void start()
@NoArgsConstructor
public class Key
{
- private String username;
+ private String user;
private String word;
}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Word
+{
+ private String user;
+ private String word;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.Value;
+
+
+@Value(staticConstructor = "of")
+public class WordCount
+{
+ String user;
+ String word;
+ long count;
+}
@Test
void testSendMessage() throws Exception
{
- TestData.writeInputData((key, value) -> kafkaTemplate.send(TOPIC_IN, key, value));
+ TestData.writeInputData((key, value) ->
+ {
+ try
+ {
+ Word word = new Word();
+ word.setUser("peter");
+ word.setWord("Hallo");
+ kafkaTemplate.send(TOPIC_IN, word.getUser(), mapper.writeValueAsString(word));
+ }
+ catch (JsonProcessingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+
+ Message peter1 = Message.of(
+ Key.of("peter", "Hallo"),
+ WordCount.of("peter", "Hallo", 1l));
+ Message peter2 = Message.of(
+ Key.of("peter", "Welt"),
+ WordCount.of("peter", "Welt", 1l));
+ Message peter3 = Message.of(
+ Key.of("peter", "Boäh"),
+ WordCount.of("peter", "Boäh", 1l));
+ Message peter4 = Message.of(
+ Key.of("peter", "Boäh"),
+ WordCount.of("peter", "Boäh", 2l));
+ Message peter5 = Message.of(
+ Key.of("peter", "Boäh"),
+ WordCount.of("peter", "Boäh", 3l));
+ Message peter6 = Message.of(
+ Key.of("peter", "Welt"),
+ WordCount.of("peter", "Welt", 2l));
+
+ Message klaus1 = Message.of(
+ Key.of("klaus", "Müsch"),
+ WordCount.of("klaus", "Müsch", 1l));
+ Message klaus2 = Message.of(
+ Key.of("klaus", "Müsch"),
+ WordCount.of("klaus", "Müsch", 2l));
+ Message klaus3 = Message.of(
+ Key.of("klaus", "s"),
+ WordCount.of("klaus", "s", 1l));
+ Message klaus4 = Message.of(
+ Key.of("klaus", "s"),
+ WordCount.of("klaus", "s", 2l));
+ Message klaus5 = Message.of(
+ Key.of("klaus", "s"),
+ WordCount.of("klaus", "s", 3l));
await("Expexted converted data")
.atMost(Duration.ofSeconds(10))
{
log.debug("Received message: {}", record);
Key key = mapper.readValue(record.key(), Key.class);
- received.add(Message.of(key,record.value()));
+ WordCount value = mapper.readValue(record.value(), WordCount.class);
+ received.add(key.getUser(), Message.of(key,value));
}
synchronized List<Message> getReceivedMessages()