From a7d74628ddfe72ef2e03e9ae07244ad6e25d91b8 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 15 Jun 2024 23:22:38 +0200 Subject: [PATCH] popular: 1.0.0 - Renamed packages and classes -- ALIGN --- .../wordcount/popular/PopularApplication.java | 6 +- .../PopularApplicationConfiguriation.java | 18 +-- .../popular/PopularApplicationProperties.java | 10 +- .../popular/PopularStreamProcessor.java | 10 +- .../juplo/kafka/wordcount/popular/User.java | 2 +- .../juplo/kafka/wordcount/popular/Word.java | 2 +- .../kafka/wordcount/popular/WordCounter.java | 2 +- src/main/resources/application.yml | 2 +- .../popular/PopularApplicationIT.java | 44 +++--- .../PopularStreamProcessorTopologyTest.java | 30 ++--- .../kafka/wordcount/popular/TestData.java | 126 +++++++++--------- .../kafka/wordcount/splitter/InputUser.java | 2 +- .../kafka/wordcount/splitter/InputWord.java | 2 +- .../wordcount/stats/OutputWindowedWord.java | 4 +- .../wordcount/stats/OutputWordCounter.java | 4 +- src/test/resources/logback-test.xml | 2 +- 16 files changed, 133 insertions(+), 133 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java index e6d3b1f..602f3f6 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java @@ -1,14 +1,14 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.popular; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication -public class CounterApplication +public class PopularApplication { public static void main(String[] args) { - SpringApplication.run(CounterApplication.class, args); + SpringApplication.run(PopularApplication.class, args); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java index 174521f..1c02197 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.popular; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -16,18 +16,18 @@ import org.springframework.kafka.support.serializer.JsonSerde; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @Configuration -@EnableConfigurationProperties(CounterApplicationProperties.class) +@EnableConfigurationProperties(PopularApplicationProperties.class) @Slf4j -public class CounterApplicationConfiguriation +public class PopularApplicationConfiguriation { @Bean public Properties streamProcessorProperties( - CounterApplicationProperties counterProperties) + PopularApplicationProperties counterProperties) { Properties propertyMap = serializationConfig(); @@ -50,19 +50,19 @@ public class CounterApplicationConfiguriation propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName()); + propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, PopularApplication.class.getPackageName()); return propertyMap; } @Bean(initMethod = "start", destroyMethod = "stop") - public CounterStreamProcessor streamProcessor( - CounterApplicationProperties applicationProperties, + public PopularStreamProcessor streamProcessor( + PopularApplicationProperties applicationProperties, Properties streamProcessorProperties, KeyValueBytesStoreSupplier storeSupplier, ConfigurableApplicationContext context) { - CounterStreamProcessor streamProcessor = new CounterStreamProcessor( + PopularStreamProcessor streamProcessor = new PopularStreamProcessor( applicationProperties.getInputTopic(), applicationProperties.getOutputTopic(), streamProcessorProperties, diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java index c3ada17..12f55f5 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.popular; import lombok.Getter; @@ -7,16 +7,16 @@ import lombok.ToString; import org.springframework.boot.context.properties.ConfigurationProperties; -@ConfigurationProperties("juplo.wordcount.counter") +@ConfigurationProperties("juplo.wordcount.popular") @Getter @Setter @ToString -public class CounterApplicationProperties +public class PopularApplicationProperties { private String bootstrapServer = "localhost:9092"; - private String applicationId = "counter"; + private String applicationId = "popular"; private String inputTopic = "words"; - private String outputTopic = "countings"; + private String outputTopic = "popular"; private Integer commitInterval; private Integer cacheMaxBytes; } diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java index 2304e55..7bba240 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.popular; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.*; @@ -17,21 +17,21 @@ import java.util.stream.Collectors; @Slf4j -public class CounterStreamProcessor +public class PopularStreamProcessor { - public static final String STORE_NAME = "counter"; + public static final String STORE_NAME = "popular"; public final KafkaStreams streams; - public CounterStreamProcessor( + public PopularStreamProcessor( String inputTopic, String outputTopic, Properties properties, KeyValueBytesStoreSupplier storeSupplier) { - Topology topology = CounterStreamProcessor.buildTopology( + Topology topology = PopularStreamProcessor.buildTopology( inputTopic, outputTopic, storeSupplier); diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/User.java b/src/main/java/de/juplo/kafka/wordcount/popular/User.java index e38bcba..6dbe261 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/User.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/User.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.popular; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/Word.java b/src/main/java/de/juplo/kafka/wordcount/popular/Word.java index 77287d5..92c1d6d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/Word.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/Word.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.popular; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java index f1fce71..39eebf0 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.popular; import lombok.AccessLevel; import lombok.AllArgsConstructor; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index d940f22..059b521 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,5 +1,5 @@ server: - port: 8083 + port: 8087 management: endpoints: web: diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java index 0faa2de..a327389 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java @@ -1,9 +1,9 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.popular; -import de.juplo.kafka.wordcount.splitter.TestInputUser; -import de.juplo.kafka.wordcount.splitter.TestInputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; +import de.juplo.kafka.wordcount.splitter.InputUser; +import de.juplo.kafka.wordcount.splitter.InputWord; +import de.juplo.kafka.wordcount.stats.OutputWindowedWord; +import de.juplo.kafka.wordcount.stats.OutputWordCounter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; @@ -26,9 +26,9 @@ import org.springframework.util.MultiValueMap; import java.time.Duration; -import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN; -import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT; -import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_IN; +import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_OUT; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME; import static org.awaitility.Awaitility.await; @@ -41,17 +41,17 @@ import static org.awaitility.Awaitility.await; "spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter", + "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.stats.OutputWindowedWord,counter:de.juplo.kafka.wordcount.stats.OutputWordCounter", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", - "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.counter.commit-interval=100", - "juplo.wordcount.counter.cache-max-bytes=0", - "juplo.wordcount.counter.input-topic=" + TOPIC_IN, - "juplo.wordcount.counter.output-topic=" + TOPIC_OUT }) + "juplo.wordcount.popular.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.popular.commit-interval=100", + "juplo.wordcount.popular.cache-max-bytes=0", + "juplo.wordcount.popular.input-topic=" + TOPIC_IN, + "juplo.wordcount.popular.output-topic=" + TOPIC_OUT }) @EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) @Slf4j -public class CounterApplicationIT +public class PopularApplicationIT { public static final String TOPIC_IN = "in"; public static final String TOPIC_OUT = "out"; @@ -59,12 +59,12 @@ public class CounterApplicationIT @Autowired Consumer consumer; @Autowired - CounterStreamProcessor streamProcessor; + PopularStreamProcessor streamProcessor; @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate kafkaTemplate) { TestData .getInputMessages() @@ -72,7 +72,7 @@ public class CounterApplicationIT { try { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), @@ -129,19 +129,19 @@ public class CounterApplicationIT static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word, - @Payload TestOutputWordCounter counter) + @Header(KafkaHeaders.RECEIVED_KEY) OutputWindowedWord word, + @Payload OutputWordCounter counter) { log.debug("Received message: {} -> {}", word, counter); received.add(word, counter); } synchronized void enforceAssertion( - java.util.function.Consumer> assertion) + java.util.function.Consumer> assertion) { assertion.accept(received); } diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java index e80e383..3e71d3e 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java @@ -1,9 +1,9 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.popular; -import de.juplo.kafka.wordcount.splitter.TestInputUser; -import de.juplo.kafka.wordcount.splitter.TestInputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; +import de.juplo.kafka.wordcount.splitter.InputUser; +import de.juplo.kafka.wordcount.splitter.InputWord; +import de.juplo.kafka.wordcount.stats.OutputWindowedWord; +import de.juplo.kafka.wordcount.stats.OutputWordCounter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; @@ -22,34 +22,34 @@ import org.springframework.util.MultiValueMap; import java.util.Map; -import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig; -import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME; @Slf4j -public class CounterStreamProcessorTopologyTest +public class PopularStreamProcessorTopologyTest { public static final String IN = "TEST-IN"; public static final String OUT = "TEST-OUT"; static TopologyTestDriver testDriver; - static MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + static MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); @BeforeAll public static void setUpTestDriver() { - Topology topology = CounterStreamProcessor.buildTopology( + Topology topology = PopularStreamProcessor.buildTopology( IN, OUT, Stores.inMemoryKeyValueStore(STORE_NAME)); testDriver = new TopologyTestDriver(topology, serializationConfig()); - TestInputTopic in = + TestInputTopic in = testDriver.createInputTopic(IN, serializer(), serializer()); - TestOutputTopic out = + TestOutputTopic out = testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer()); TestData @@ -104,12 +104,12 @@ public class CounterStreamProcessorTopologyTest return new JsonSerializer().noTypeInfo(); } - private static JsonDeserializer keyDeserializer() + private static JsonDeserializer keyDeserializer() { return deserializer(true); } - private static JsonDeserializer valueDeserializer() + private static JsonDeserializer valueDeserializer() { return deserializer(false); } @@ -125,6 +125,6 @@ public class CounterStreamProcessorTopologyTest private static String typeMappingsConfig() { - return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class); + return PopularStreamProcessor.typeMappingsConfig(OutputWindowedWord.class, OutputWordCounter.class); } } diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java index 862eb2b..892ea81 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java @@ -1,9 +1,9 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.popular; -import de.juplo.kafka.wordcount.splitter.TestInputUser; -import de.juplo.kafka.wordcount.splitter.TestInputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; +import de.juplo.kafka.wordcount.splitter.InputUser; +import de.juplo.kafka.wordcount.splitter.InputWord; +import de.juplo.kafka.wordcount.stats.OutputWindowedWord; +import de.juplo.kafka.wordcount.stats.OutputWordCounter; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; @@ -25,55 +25,55 @@ class TestData static final String WORD_S = "s"; static final String WORD_BOÄH = "Boäh"; - static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO); - static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT); - static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH); - static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH); - static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S); + static final OutputWindowedWord PETER_HALLO = OutputWindowedWord.of(PETER, WORD_HALLO); + static final OutputWindowedWord PETER_WELT = OutputWindowedWord.of(PETER, WORD_WELT); + static final OutputWindowedWord PETER_BOÄH = OutputWindowedWord.of(PETER, WORD_BOÄH); + static final OutputWindowedWord KLAUS_MÜSCH = OutputWindowedWord.of(KLAUS, WORD_MÜSCH); + static final OutputWindowedWord KLAUS_S = OutputWindowedWord.of(KLAUS, WORD_S); - private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_HALLO)), + InputUser.of(PETER), + InputWord.of(PETER, WORD_HALLO)), KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_MÜSCH)), + InputUser.of(KLAUS), + InputWord.of(KLAUS, WORD_MÜSCH)), KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_WELT)), + InputUser.of(PETER), + InputWord.of(PETER, WORD_WELT)), KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_MÜSCH)), + InputUser.of(KLAUS), + InputWord.of(KLAUS, WORD_MÜSCH)), KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_S)), + InputUser.of(KLAUS), + InputWord.of(KLAUS, WORD_S)), KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_BOÄH)), + InputUser.of(PETER), + InputWord.of(PETER, WORD_BOÄH)), KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_WELT)), + InputUser.of(PETER), + InputWord.of(PETER, WORD_WELT)), KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_BOÄH)), + InputUser.of(PETER), + InputWord.of(PETER, WORD_BOÄH)), KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_S)), + InputUser.of(KLAUS), + InputWord.of(KLAUS, WORD_S)), KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_BOÄH)), + InputUser.of(PETER), + InputWord.of(PETER, WORD_BOÄH)), KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_S)), + InputUser.of(KLAUS), + InputWord.of(KLAUS, WORD_S)), }; - static Stream> getInputMessages() + static Stream> getInputMessages() { return Stream.of(TestData.INPUT_MESSAGES); } - static void assertExpectedMessages(MultiValueMap receivedMessages) + static void assertExpectedMessages(MultiValueMap receivedMessages) { expectedMessages().forEach( (word, counter) -> @@ -81,7 +81,7 @@ class TestData .containsExactlyElementsOf(counter)); } - static void assertExpectedNumberOfMessagesForWord(MultiValueMap receivedMessages) + static void assertExpectedNumberOfMessagesForWord(MultiValueMap receivedMessages) { assertThat(countMessagesForWord(PETER_HALLO, receivedMessages)); assertThat(countMessagesForWord(PETER_WELT, receivedMessages)); @@ -90,7 +90,7 @@ class TestData assertThat(countMessagesForWord(KLAUS_S, receivedMessages)); } - private static int countMessagesForWord(TestOutputWord word, MultiValueMap messagesForUsers) + private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap messagesForUsers) { return messagesForUsers.get(word) == null ? 0 @@ -106,17 +106,17 @@ class TestData assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S))); } - private static Word wordOf(TestOutputWord testOutputWord) + private static Word wordOf(OutputWindowedWord testOutputWindowedWord) { Word word = new Word(); - word.setUser(testOutputWord.getUser()); - word.setWord(testOutputWord.getWord()); + word.setUser(testOutputWindowedWord.getUser()); + word.setWord(testOutputWindowedWord.getWord()); return word; } - static void assertExpectedLastMessagesForWord(MultiValueMap receivedMessages) + static void assertExpectedLastMessagesForWord(MultiValueMap receivedMessages) { assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages)); assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages)); @@ -126,10 +126,10 @@ class TestData } private static void assertWordCountEqualsWordCountFromLastMessage( - TestOutputWord word, + OutputWindowedWord word, Long counter) { - TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of( + OutputWordCounter testOutputWordCounter = OutputWordCounter.of( word.getUser(), word.getWord(), counter); @@ -137,20 +137,20 @@ class TestData } private static void assertWordCountEqualsWordCountFromLastMessage( - TestOutputWord word, - TestOutputWordCounter counter) + OutputWindowedWord word, + OutputWordCounter counter) { assertThat(counter).isEqualTo(getLastMessageFor(word)); } - private static TestOutputWordCounter getLastMessageFor(TestOutputWord word) + private static OutputWordCounter getLastMessageFor(OutputWindowedWord word) { return getLastMessageFor(word, expectedMessages()); } - private static TestOutputWordCounter getLastMessageFor( - TestOutputWord user, - MultiValueMap messagesForWord) + private static OutputWordCounter getLastMessageFor( + OutputWindowedWord user, + MultiValueMap messagesForWord) { return messagesForWord .get(user) @@ -158,46 +158,46 @@ class TestData .reduce(null, (left, right) -> right); } - private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( PETER_HALLO, - TestOutputWordCounter.of(PETER, WORD_HALLO,1)), + OutputWordCounter.of(PETER, WORD_HALLO,1)), KeyValue.pair( KLAUS_MÜSCH, - TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)), + OutputWordCounter.of(KLAUS, WORD_MÜSCH,1)), KeyValue.pair( PETER_WELT, - TestOutputWordCounter.of(PETER, WORD_WELT,1)), + OutputWordCounter.of(PETER, WORD_WELT,1)), KeyValue.pair( KLAUS_MÜSCH, - TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)), + OutputWordCounter.of(KLAUS, WORD_MÜSCH,2)), KeyValue.pair( KLAUS_S, - TestOutputWordCounter.of(KLAUS, WORD_S,1)), + OutputWordCounter.of(KLAUS, WORD_S,1)), KeyValue.pair( PETER_BOÄH, - TestOutputWordCounter.of(PETER, WORD_BOÄH,1)), + OutputWordCounter.of(PETER, WORD_BOÄH,1)), KeyValue.pair( PETER_WELT, - TestOutputWordCounter.of(PETER, WORD_WELT,2)), + OutputWordCounter.of(PETER, WORD_WELT,2)), KeyValue.pair( PETER_BOÄH, - TestOutputWordCounter.of(PETER, WORD_BOÄH,2)), + OutputWordCounter.of(PETER, WORD_BOÄH,2)), KeyValue.pair( KLAUS_S, - TestOutputWordCounter.of(KLAUS, WORD_S,2)), + OutputWordCounter.of(KLAUS, WORD_S,2)), KeyValue.pair( PETER_BOÄH, - TestOutputWordCounter.of(PETER, WORD_BOÄH,3)), + OutputWordCounter.of(PETER, WORD_BOÄH,3)), KeyValue.pair( KLAUS_S, - TestOutputWordCounter.of(KLAUS, WORD_S,3)), + OutputWordCounter.of(KLAUS, WORD_S,3)), }; - static MultiValueMap expectedMessages() + static MultiValueMap expectedMessages() { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); Stream .of(EXPECTED_MESSAGES) .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/InputUser.java b/src/test/java/de/juplo/kafka/wordcount/splitter/InputUser.java index 2255b61..483baaa 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/InputUser.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/InputUser.java @@ -8,7 +8,7 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor(staticName = "of") -public class TestInputUser +public class InputUser { String user; } diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/InputWord.java b/src/test/java/de/juplo/kafka/wordcount/splitter/InputWord.java index 71ed1d9..53b6af2 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/InputWord.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/InputWord.java @@ -8,7 +8,7 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor(staticName = "of") -public class TestInputWord +public class InputWord { String user; String word; diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java index cfc2cae..83288fa 100644 --- a/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java +++ b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.stats; import lombok.AllArgsConstructor; import lombok.Data; @@ -8,7 +8,7 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor(staticName = "of") -public class TestOutputWord +public class OutputWindowedWord { String user; String word; diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/OutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWordCounter.java index 1b59387..b5d2499 100644 --- a/src/test/java/de/juplo/kafka/wordcount/stats/OutputWordCounter.java +++ b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWordCounter.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.stats; import lombok.AllArgsConstructor; import lombok.Data; @@ -8,7 +8,7 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor(staticName = "of") -public class TestOutputWordCounter +public class OutputWordCounter { String user; String word; diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 171bf63..0d0c912 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -1,5 +1,5 @@ - + -- 2.20.1