</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
- <version>1.1.10</version>
+ <version>1.1.11</version>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
Properties properties,
KeyValueBytesStoreSupplier storeSupplier,
ObjectMapper mapper)
+ {
+ Topology topology = CounterStreamProcessor.buildTopology(
+ inputTopic,
+ outputTopic,
+ storeSupplier,
+ mapper);
+
+ streams = new KafkaStreams(topology, properties);
+ }
+
+ static Topology buildTopology(
+ String inputTopic,
+ String outputTopic,
+ KeyValueBytesStoreSupplier storeSupplier,
+ ObjectMapper mapper)
{
StreamsBuilder builder = new StreamsBuilder();
.toStream()
.to(outputTopic);
- streams = new KafkaStreams(builder.build(), properties);
+ return builder.build();
}
public void start()
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.serialization.*;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Properties;
+
+
+public class CounterStreamProcessorTopologyTest
+{
+ public final static String IN = "TEST-IN";
+ public final static String OUT = "TEST-OUT";
+
+ @Test
+ public void test()
+ {
+ Topology topology = CounterStreamProcessor.buildTopology(
+ IN,
+ OUT,
+ Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"),
+ new ObjectMapper());
+
+ CounterApplicationConfiguriation config =
+ new CounterApplicationConfiguriation();
+ Properties properties =
+ config.propertyMap(new CounterApplicationProperties());
+
+ TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
+
+ TestInputTopic<String, String> in = testDriver.createInputTopic(
+ IN,
+ new StringSerializer(),
+ new StringSerializer());
+
+ TestOutputTopic<String, String> out = testDriver.createOutputTopic(
+ OUT,
+ new StringDeserializer(),
+ new StringDeserializer());
+
+ TestData.writeInputData((key, value) -> in.pipeInput(key, value));
+
+ List<Message> receivedMessages = out
+ .readRecordsToList()
+ .stream()
+ .map(record -> Message.of(record.key(), record.value()))
+ .toList();
+
+ TestData.assertExpectedResult(receivedMessages);
+ }
+}