WIP:test
authorKai Moritz <kai@juplo.de>
Sat, 11 Feb 2023 12:02:44 +0000 (13:02 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 12 Feb 2023 13:54:42 +0000 (14:54 +0100)
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/counter/Key.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java [new file with mode: 0644]

index b19502d..3c34159 100644 (file)
@@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
@@ -25,6 +26,16 @@ public class CounterStreamProcessor
                        Properties properties,
                        KeyValueBytesStoreSupplier storeSupplier,
                        ObjectMapper mapper)
+       {
+               Topology topology =
+                               CounterStreamProcessor.buildTopology(inputTopic, outputTopic, mapper);
+               streams = new KafkaStreams(topology, properties);
+       }
+
+       static Topology buildTopology(
+                       String inputTopic,
+                       String outputTopic,
+                       ObjectMapper mapper)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
@@ -48,7 +59,7 @@ public class CounterStreamProcessor
                                .toStream()
                                .to(outputTopic);
 
-               streams = new KafkaStreams(builder.build(), properties);
+               return builder.build();
        }
 
        public void start()
index 1e00dca..f926efe 100644 (file)
@@ -1,11 +1,16 @@
 package de.juplo.kafka.wordcount.counter;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 import lombok.Value;
 
 
-@Value(staticConstructor = "of")
+@Data
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
 public class Key
 {
-  private final String username;
-  private final String word;
+  private String username;
+  private String word;
 }
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
new file mode 100644 (file)
index 0000000..8fc08a1
--- /dev/null
@@ -0,0 +1,64 @@
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.*;
+import org.apache.kafka.streams.*;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.List;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+
+public class CounterStreamProcessorTopologyTest
+{
+  public final static String IN = "TEST-IN";
+  public final static String OUT = "TEST-OUT";
+
+  @Test
+  public void test()
+  {
+    ObjectMapper mapper = new ObjectMapper();
+    Topology topology = CounterStreamProcessor.buildTopology(IN, OUT, mapper);
+
+    Properties properties = new Properties();
+    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
+    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+    properties.put(StreamsConfig.STATE_DIR_CONFIG, "target");
+    properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
+    properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+    TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
+
+    TestInputTopic<String, String> in = testDriver.createInputTopic(
+        IN,
+        new StringSerializer(),
+        new StringSerializer());
+
+    TestOutputTopic<Key, String> out = testDriver.createOutputTopic(
+        OUT,
+        new JsonDeserializer<Key>(Key.class).ignoreTypeHeaders(),
+        new StringDeserializer());
+
+    TestData.writeInputData((key, value) -> in.pipeInput(key, value));
+
+    List<Message> receivedMessages = out
+        .readRecordsToList()
+        .stream()
+        .map(record -> Message.of(record.key(), record.value()))
+        .toList();
+
+    TestData.assertExpectedResult(receivedMessages);
+  }
+}