import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
Properties properties,
KeyValueBytesStoreSupplier storeSupplier,
ObjectMapper mapper)
+ {
+ Topology topology =
+ CounterStreamProcessor.buildTopology(inputTopic, outputTopic, mapper);
+ streams = new KafkaStreams(topology, properties);
+ }
+
+ static Topology buildTopology(
+ String inputTopic,
+ String outputTopic,
+ ObjectMapper mapper)
{
StreamsBuilder builder = new StreamsBuilder();
.toStream()
.to(outputTopic);
- streams = new KafkaStreams(builder.build(), properties);
+ return builder.build();
}
public void start()
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.*;
+import org.apache.kafka.streams.*;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.List;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+
+public class CounterStreamProcessorTopologyTest
+{
+ public final static String IN = "TEST-IN";
+ public final static String OUT = "TEST-OUT";
+
+ @Test
+ public void test()
+ {
+ ObjectMapper mapper = new ObjectMapper();
+ Topology topology = CounterStreamProcessor.buildTopology(IN, OUT, mapper);
+
+ Properties properties = new Properties();
+ properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
+ properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+ properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+ properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+ properties.put(StreamsConfig.STATE_DIR_CONFIG, "target");
+ properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
+ properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
+
+ TestInputTopic<String, String> in = testDriver.createInputTopic(
+ IN,
+ new StringSerializer(),
+ new StringSerializer());
+
+ TestOutputTopic<Key, String> out = testDriver.createOutputTopic(
+ OUT,
+ new JsonDeserializer<Key>(Key.class).ignoreTypeHeaders(),
+ new StringDeserializer());
+
+ TestData.writeInputData((key, value) -> in.pipeInput(key, value));
+
+ List<Message> receivedMessages = out
+ .readRecordsToList()
+ .stream()
+ .map(record -> Message.of(record.key(), record.value()))
+ .toList();
+
+ TestData.assertExpectedResult(receivedMessages);
+ }
+}