WIP
authorKai Moritz <kai@juplo.de>
Tue, 14 May 2024 21:52:28 +0000 (23:52 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 14 May 2024 21:52:28 +0000 (23:52 +0200)
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java

index ae161d8..fdb2e6b 100644 (file)
@@ -34,7 +34,7 @@ public class Top10ApplicationConfiguration
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
                props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName());
-               props.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName());
+               props.put(JsonDeserializer.KEY_DEFAULT_TYPE, Key.class.getName());
                props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Counter.class.getName());
                props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
index 3a744dd..7a03ba0 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.top10;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
@@ -12,8 +11,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
-import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
+import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
 
@@ -29,10 +28,10 @@ public class Top10StreamProcessorTopologyTest
   {
     Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
 
-    Top10ApplicationConfiguriation applicationConfiguriation =
-        new Top10ApplicationConfiguriation();
+    Top10ApplicationConfiguration applicationConfiguriation =
+        new Top10ApplicationConfiguration();
     Properties streamProcessorProperties =
-        applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
+        applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
     Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
 
     JsonSerde<?> keySerde = new JsonSerde<>();
@@ -42,19 +41,19 @@ public class Top10StreamProcessorTopologyTest
 
     TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
 
-    TestInputTopic<String, Word> in = testDriver.createInputTopic(
+    TestInputTopic<Key, Counter> in = testDriver.createInputTopic(
         IN,
-        (JsonSerializer<String>)keySerde.serializer(),
-        (JsonSerializer<Word>)valueSerde.serializer());
+        (JsonSerializer<Key>)keySerde.serializer(),
+        (JsonSerializer<Counter>)valueSerde.serializer());
 
-    TestOutputTopic<Word, WordCounter> out = testDriver.createOutputTopic(
+    TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
         OUT,
-        (JsonDeserializer<Word>)keySerde.deserializer(),
-        (JsonDeserializer<WordCounter>)valueSerde.deserializer());
+        (JsonDeserializer<String>)keySerde.deserializer(),
+        (JsonDeserializer<Ranking>)valueSerde.deserializer());
 
     TestData.writeInputData((key, value) -> in.pipeInput(key, value));
 
-    List<KeyValue<Word, WordCounter>> receivedMessages = out
+    List<KeyValue<String, Ranking>> receivedMessages = out
         .readRecordsToList()
         .stream()
         .map(record ->