From: Kai Moritz Date: Sun, 26 May 2024 07:07:48 +0000 (+0200) Subject: top10: 1.2.1 - (RED) Fixed de-/serialization, turned of caching in IT X-Git-Tag: top10-1.2.1~17 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=df0c22234e9ace115b4e4abc4e3d881d5595668e;p=demos%2Fkafka%2Fwordcount top10: 1.2.1 - (RED) Fixed de-/serialization, turned of caching in IT * Fixed the type-mapping for the `JsonDeserializer`. * Disabled the caching / out-buffering in Streams. * Both changes are necessary, to see in `Top10ApplicationIT`, that messages are processed: _Compare the outcome of the test before and after this commit!_ * Nonetheless, the integration-test still fails, because the assertion is not true. --- diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 6f18339..000db01 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -38,13 +38,15 @@ public class Top10ApplicationConfiguration props.put( JsonDeserializer.TYPE_MAPPINGS, "word:" + Key.class.getName() + "," + - "counter:" + Entry.class.getName()); - props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE); - props.put( - JsonSerializer.TYPE_MAPPINGS, + "counter:" + Entry.class.getName() + "," + "user:" + User.class.getName() + "," + "ranking:" + Ranking.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + if (properties.getCommitInterval() != null) + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); + if (properties.getCacheMaxBytes() != null) + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, properties.getCacheMaxBytes()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java index 93b78ec..d3bb236 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java @@ -17,4 +17,6 @@ public class Top10ApplicationProperties private String applicationId = "top10"; private String inputTopic = "countings"; private String outputTopic = "top10"; + private Integer commitInterval; + private Integer cacheMaxBytes; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index 2d45f03..b35dd3d 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -41,7 +41,11 @@ import static org.awaitility.Awaitility.await; "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10 ", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", + "logging.level.org.apache.kafka.clients=INFO", + "logging.level.org.apache.kafka.streams=INFO", "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.top10.commit-interval=100", + "juplo.wordcount.top10.cacheMaxBytes=0", "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN, "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT }) @EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT })