X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Ftop10%2FTop10StreamProcessorTopologyTest.java;h=7a03ba01d6be8b8470d558c30a630243b31ce72b;hb=87f82fe35276666d298bc5100f0810b6aa6ce2d4;hp=8b4a593ba7724afdd29eb2c5eb99ca354c973298;hpb=e7c52491bcef70efd85487b7469a32e832d61724;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 8b4a593..7a03ba0 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.top10; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerde; @@ -12,8 +11,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import static de.juplo.kafka.wordcount.counter.TestData.convertToMap; -import static de.juplo.kafka.wordcount.counter.TestData.parseHeader; +import static de.juplo.kafka.wordcount.top10.TestData.convertToMap; +import static de.juplo.kafka.wordcount.top10.TestData.parseHeader; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME; @@ -27,15 +26,12 @@ public class Top10StreamProcessorTopologyTest @Test public void test() { - Topology topology = CounterStreamProcessor.buildTopology( - IN, - OUT, - Stores.inMemoryKeyValueStore("TOPOLOGY-TEST")); + Topology topology = Top10StreamProcessor.buildTopology(IN, OUT); - CounterApplicationConfiguriation applicationConfiguriation = - new CounterApplicationConfiguriation(); + Top10ApplicationConfiguration applicationConfiguriation = + new Top10ApplicationConfiguration(); Properties streamProcessorProperties = - applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties()); + applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties()); Map propertyMap = convertToMap(streamProcessorProperties); JsonSerde keySerde = new JsonSerde<>(); @@ -45,19 +41,19 @@ public class Top10StreamProcessorTopologyTest TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties); - TestInputTopic in = testDriver.createInputTopic( + TestInputTopic in = testDriver.createInputTopic( IN, - (JsonSerializer)keySerde.serializer(), - (JsonSerializer)valueSerde.serializer()); + (JsonSerializer)keySerde.serializer(), + (JsonSerializer)valueSerde.serializer()); - TestOutputTopic out = testDriver.createOutputTopic( + TestOutputTopic out = testDriver.createOutputTopic( OUT, - (JsonDeserializer)keySerde.deserializer(), - (JsonDeserializer)valueSerde.deserializer()); + (JsonDeserializer)keySerde.deserializer(), + (JsonDeserializer)valueSerde.deserializer()); TestData.writeInputData((key, value) -> in.pipeInput(key, value)); - List> receivedMessages = out + List> receivedMessages = out .readRecordsToList() .stream() .map(record ->