-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import de.juplo.kafka.wordcount.splitter.InputUser;
+import de.juplo.kafka.wordcount.splitter.InputWord;
+import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
+import de.juplo.kafka.wordcount.stats.OutputWordCounter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import java.util.Map;
-import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
@Slf4j
-public class CounterStreamProcessorTopologyTest
+public class PopularStreamProcessorTopologyTest
{
public static final String IN = "TEST-IN";
public static final String OUT = "TEST-OUT";
static TopologyTestDriver testDriver;
- static MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
+ static MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
@BeforeAll
public static void setUpTestDriver()
{
- Topology topology = CounterStreamProcessor.buildTopology(
+ Topology topology = PopularStreamProcessor.buildTopology(
IN,
OUT,
Stores.inMemoryKeyValueStore(STORE_NAME));
testDriver = new TopologyTestDriver(topology, serializationConfig());
- TestInputTopic<TestInputUser, TestInputWord> in =
+ TestInputTopic<InputUser, InputWord> in =
testDriver.createInputTopic(IN, serializer(), serializer());
- TestOutputTopic<TestOutputWord, TestOutputWordCounter> out =
+ TestOutputTopic<OutputWindowedWord, OutputWordCounter> out =
testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
TestData
return new JsonSerializer().noTypeInfo();
}
- private static JsonDeserializer<TestOutputWord> keyDeserializer()
+ private static JsonDeserializer<OutputWindowedWord> keyDeserializer()
{
return deserializer(true);
}
- private static JsonDeserializer<TestOutputWordCounter> valueDeserializer()
+ private static JsonDeserializer<OutputWordCounter> valueDeserializer()
{
return deserializer(false);
}
private static String typeMappingsConfig()
{
- return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class);
+ return PopularStreamProcessor.typeMappingsConfig(OutputWindowedWord.class, OutputWordCounter.class);
}
}