</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
- <version>1.2.5</version>
+ <version>1.2.6</version>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
package de.juplo.kafka.wordcount.counter;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.StreamsConfig;
CounterApplicationProperties properties,
Properties propertyMap,
KeyValueBytesStoreSupplier storeSupplier,
- ObjectMapper objectMapper,
ConfigurableApplicationContext context)
{
CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
properties.getInputTopic(),
properties.getOutputTopic(),
propertyMap,
- storeSupplier,
- objectMapper);
+ storeSupplier);
streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
{
package de.juplo.kafka.wordcount.counter;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
String inputTopic,
String outputTopic,
Properties properties,
- KeyValueBytesStoreSupplier storeSupplier,
- ObjectMapper mapper)
+ KeyValueBytesStoreSupplier storeSupplier)
{
Topology topology = CounterStreamProcessor.buildTopology(
inputTopic,
outputTopic,
- storeSupplier,
- mapper);
+ storeSupplier);
streams = new KafkaStreams(topology, properties);
}
static Topology buildTopology(
String inputTopic,
String outputTopic,
- KeyValueBytesStoreSupplier storeSupplier,
- ObjectMapper mapper)
+ KeyValueBytesStoreSupplier storeSupplier)
{
StreamsBuilder builder = new StreamsBuilder();
package de.juplo.kafka.wordcount.counter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.serialization.*;
-import org.apache.kafka.streams.*;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Test;
Topology topology = CounterStreamProcessor.buildTopology(
IN,
OUT,
- Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"),
- new ObjectMapper());
+ Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
CounterApplicationConfiguriation config =
new CounterApplicationConfiguriation();