From: Kai Moritz Date: Sun, 12 May 2024 15:43:29 +0000 (+0200) Subject: counter: 1.2.6 - Removed unused explicit references to `ObjectMapper` X-Git-Tag: counter-1.2.6 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=3ff17b238afc765fe944a20ef67a2f6288fd5df2;p=demos%2Fkafka%2Fwordcount counter: 1.2.6 - Removed unused explicit references to `ObjectMapper` --- diff --git a/pom.xml b/pom.xml index 5aa7541..46a7150 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount counter - 1.2.5 + 1.2.6 Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java index 3e2d2e7..409c035 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java @@ -1,6 +1,5 @@ package de.juplo.kafka.wordcount.counter; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.StreamsConfig; @@ -55,15 +54,13 @@ public class CounterApplicationConfiguriation CounterApplicationProperties properties, Properties propertyMap, KeyValueBytesStoreSupplier storeSupplier, - ObjectMapper objectMapper, ConfigurableApplicationContext context) { CounterStreamProcessor streamProcessor = new CounterStreamProcessor( properties.getInputTopic(), properties.getOutputTopic(), propertyMap, - storeSupplier, - objectMapper); + storeSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index d64eb68..3dd8c0f 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -1,15 +1,15 @@ package de.juplo.kafka.wordcount.counter; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.kstream.*; import java.util.Properties; @@ -24,14 +24,12 @@ public class CounterStreamProcessor String inputTopic, String outputTopic, Properties properties, - KeyValueBytesStoreSupplier storeSupplier, - ObjectMapper mapper) + KeyValueBytesStoreSupplier storeSupplier) { Topology topology = CounterStreamProcessor.buildTopology( inputTopic, outputTopic, - storeSupplier, - mapper); + storeSupplier); streams = new KafkaStreams(topology, properties); } @@ -39,8 +37,7 @@ public class CounterStreamProcessor static Topology buildTopology( String inputTopic, String outputTopic, - KeyValueBytesStoreSupplier storeSupplier, - ObjectMapper mapper) + KeyValueBytesStoreSupplier storeSupplier) { StreamsBuilder builder = new StreamsBuilder(); diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index c2ada6f..2c8bd1e 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -1,8 +1,11 @@ package de.juplo.kafka.wordcount.counter; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kafka.common.serialization.*; -import org.apache.kafka.streams.*; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.Test; @@ -21,8 +24,7 @@ public class CounterStreamProcessorTopologyTest Topology topology = CounterStreamProcessor.buildTopology( IN, OUT, - Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"), - new ObjectMapper()); + Stores.inMemoryKeyValueStore("TOPOLOGY-TEST")); CounterApplicationConfiguriation config = new CounterApplicationConfiguriation();