X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Ftop10%2FTop10StreamProcessorTopologyTest.java;h=7a03ba01d6be8b8470d558c30a630243b31ce72b;hb=87f82fe35276666d298bc5100f0810b6aa6ce2d4;hp=3a744ddf67f3b16375d65556b50ff01c19fd8208;hpb=a01b5436757600a44b20260798d7331c7e33a851;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 3a744dd..7a03ba0 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.top10; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerde; @@ -12,8 +11,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import static de.juplo.kafka.wordcount.counter.TestData.convertToMap; -import static de.juplo.kafka.wordcount.counter.TestData.parseHeader; +import static de.juplo.kafka.wordcount.top10.TestData.convertToMap; +import static de.juplo.kafka.wordcount.top10.TestData.parseHeader; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME; @@ -29,10 +28,10 @@ public class Top10StreamProcessorTopologyTest { Topology topology = Top10StreamProcessor.buildTopology(IN, OUT); - Top10ApplicationConfiguriation applicationConfiguriation = - new Top10ApplicationConfiguriation(); + Top10ApplicationConfiguration applicationConfiguriation = + new Top10ApplicationConfiguration(); Properties streamProcessorProperties = - applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties()); + applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties()); Map propertyMap = convertToMap(streamProcessorProperties); JsonSerde keySerde = new JsonSerde<>(); @@ -42,19 +41,19 @@ public class Top10StreamProcessorTopologyTest TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties); - TestInputTopic in = testDriver.createInputTopic( + TestInputTopic in = testDriver.createInputTopic( IN, - (JsonSerializer)keySerde.serializer(), - (JsonSerializer)valueSerde.serializer()); + (JsonSerializer)keySerde.serializer(), + (JsonSerializer)valueSerde.serializer()); - TestOutputTopic out = testDriver.createOutputTopic( + TestOutputTopic out = testDriver.createOutputTopic( OUT, - (JsonDeserializer)keySerde.deserializer(), - (JsonDeserializer)valueSerde.deserializer()); + (JsonDeserializer)keySerde.deserializer(), + (JsonDeserializer)valueSerde.deserializer()); TestData.writeInputData((key, value) -> in.pipeInput(key, value)); - List> receivedMessages = out + List> receivedMessages = out .readRecordsToList() .stream() .map(record ->