counter: 1.2.6 - Removed unused explicit references to `ObjectMapper` counter-1.2.6
authorKai Moritz <kai@juplo.de>
Sun, 12 May 2024 15:43:29 +0000 (17:43 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 14 May 2024 20:30:39 +0000 (22:30 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java

diff --git a/pom.xml b/pom.xml
index 5aa7541..46a7150 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>counter</artifactId>
-       <version>1.2.5</version>
+       <version>1.2.6</version>
        <name>Wordcount-Counter</name>
        <description>Word-counting stream-processor of the multi-user wordcount-example</description>
        <properties>
index 3e2d2e7..409c035 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka.wordcount.counter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.streams.StreamsConfig;
@@ -55,15 +54,13 @@ public class CounterApplicationConfiguriation
                        CounterApplicationProperties properties,
                        Properties propertyMap,
                        KeyValueBytesStoreSupplier storeSupplier,
-                       ObjectMapper objectMapper,
                        ConfigurableApplicationContext context)
        {
                CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
                                properties.getInputTopic(),
                                properties.getOutputTopic(),
                                propertyMap,
-                               storeSupplier,
-                               objectMapper);
+                               storeSupplier);
 
                streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
                {
index d64eb68..3dd8c0f 100644 (file)
@@ -1,15 +1,15 @@
 package de.juplo.kafka.wordcount.counter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.kstream.*;
 
 import java.util.Properties;
 
@@ -24,14 +24,12 @@ public class CounterStreamProcessor
                        String inputTopic,
                        String outputTopic,
                        Properties properties,
-                       KeyValueBytesStoreSupplier storeSupplier,
-                       ObjectMapper mapper)
+                       KeyValueBytesStoreSupplier storeSupplier)
        {
                Topology topology = CounterStreamProcessor.buildTopology(
                                inputTopic,
                                outputTopic,
-                               storeSupplier,
-                               mapper);
+                               storeSupplier);
 
                streams = new KafkaStreams(topology, properties);
        }
@@ -39,8 +37,7 @@ public class CounterStreamProcessor
        static Topology buildTopology(
                        String inputTopic,
                        String outputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier,
-                       ObjectMapper mapper)
+                       KeyValueBytesStoreSupplier storeSupplier)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
index c2ada6f..2c8bd1e 100644 (file)
@@ -1,8 +1,11 @@
 package de.juplo.kafka.wordcount.counter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.serialization.*;
-import org.apache.kafka.streams.*;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.Test;
 
@@ -21,8 +24,7 @@ public class CounterStreamProcessorTopologyTest
     Topology topology = CounterStreamProcessor.buildTopology(
         IN,
         OUT,
-        Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"),
-        new ObjectMapper());
+        Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
 
     CounterApplicationConfiguriation config =
         new CounterApplicationConfiguriation();