WIP
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessorTopologyTest.java
index 3a744dd..7a03ba0 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.top10;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
@@ -12,8 +11,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
-import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
+import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
 
@@ -29,10 +28,10 @@ public class Top10StreamProcessorTopologyTest
   {
     Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
 
-    Top10ApplicationConfiguriation applicationConfiguriation =
-        new Top10ApplicationConfiguriation();
+    Top10ApplicationConfiguration applicationConfiguriation =
+        new Top10ApplicationConfiguration();
     Properties streamProcessorProperties =
-        applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
+        applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
     Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
 
     JsonSerde<?> keySerde = new JsonSerde<>();
@@ -42,19 +41,19 @@ public class Top10StreamProcessorTopologyTest
 
     TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
 
-    TestInputTopic<String, Word> in = testDriver.createInputTopic(
+    TestInputTopic<Key, Counter> in = testDriver.createInputTopic(
         IN,
-        (JsonSerializer<String>)keySerde.serializer(),
-        (JsonSerializer<Word>)valueSerde.serializer());
+        (JsonSerializer<Key>)keySerde.serializer(),
+        (JsonSerializer<Counter>)valueSerde.serializer());
 
-    TestOutputTopic<Word, WordCounter> out = testDriver.createOutputTopic(
+    TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
         OUT,
-        (JsonDeserializer<Word>)keySerde.deserializer(),
-        (JsonDeserializer<WordCounter>)valueSerde.deserializer());
+        (JsonDeserializer<String>)keySerde.deserializer(),
+        (JsonDeserializer<Ranking>)valueSerde.deserializer());
 
     TestData.writeInputData((key, value) -> in.pipeInput(key, value));
 
-    List<KeyValue<Word, WordCounter>> receivedMessages = out
+    List<KeyValue<String, Ranking>> receivedMessages = out
         .readRecordsToList()
         .stream()
         .map(record ->