WIP:RED top10
authorKai Moritz <kai@juplo.de>
Tue, 28 May 2024 17:19:33 +0000 (19:19 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 28 May 2024 17:19:33 +0000 (19:19 +0200)
src/test/java/de/juplo/kafka/wordcount/top10/CounterApplicationIT.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java [new file with mode: 0644]

diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/CounterApplicationIT.java
deleted file mode 100644 (file)
index 50c2a50..0000000
+++ /dev/null
@@ -1,132 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import de.juplo.kafka.wordcount.counter.Word;
-import de.juplo.kafka.wordcount.counter.WordCounter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.messaging.handler.annotation.Payload;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.time.Duration;
-import java.util.stream.Stream;
-
-import static org.awaitility.Awaitility.await;
-
-
-@SpringBootTest(
-               properties = {
-                               "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-                               "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-                               "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.Word,counter:de.juplo.kafka.wordcount.counter.WordCounter",
-                               "spring.kafka.consumer.auto-offset-reset=earliest",
-                               "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.properties.spring.json.use.type.headers=false",
-                               "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.top10.RankingData",
-                               "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10   ",
-                               "logging.level.root=WARN",
-                               "logging.level.de.juplo=DEBUG",
-                               "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "juplo.wordcount.top10.commit-interval=0",
-                               "juplo.wordcount.top10.cacheMaxBytes=0",
-                               "juplo.wordcount.top10.input-topic=" + CounterApplicationIT.TOPIC_IN,
-                               "juplo.wordcount.top10.output-topic=" + CounterApplicationIT.TOPIC_OUT })
-@EmbeddedKafka(topics = { CounterApplicationIT.TOPIC_IN, CounterApplicationIT.TOPIC_OUT })
-@Slf4j
-public class CounterApplicationIT
-{
-       public final static String TOPIC_IN = "in";
-       public final static String TOPIC_OUT = "out";
-
-       @Autowired
-       KafkaTemplate<Word, WordCounter> kafkaTemplate;
-       @Autowired
-       Consumer consumer;
-
-
-       @BeforeEach
-       public void clear()
-       {
-               consumer.received.clear();
-       }
-
-
-       @Test
-       void testSendMessage()
-       {
-               Stream
-                               .of(TestData.INPUT_MESSAGES)
-                               .forEach(kv ->
-                               {
-                                       try
-                                       {
-                                               SendResult<Word, WordCounter> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
-                                               log.info(
-                                                               "Sent: {}={}, partition={}, offset={}",
-                                                               result.getProducerRecord().key(),
-                                                               result.getProducerRecord().value(),
-                                                               result.getRecordMetadata().partition(),
-                                                               result.getRecordMetadata().offset());
-                                       }
-                                       catch (Exception e)
-                                       {
-                                               throw new RuntimeException(e);
-                                       }
-                               });
-
-               await("Expexted converted data")
-                               .atMost(Duration.ofSeconds(10))
-                               .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages()));
-       }
-
-
-       static class Consumer
-       {
-               private final MultiValueMap<String, Ranking> received = new LinkedMultiValueMap<>();
-
-               @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
-               public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) String user,
-                               @Payload RankingData ranking)
-               {
-                       log.debug("Received message: {} -> {}", user, ranking);
-                       received.add(user, Ranking.of(ranking.entries));
-               }
-
-               synchronized MultiValueMap<String, Ranking> getReceivedMessages()
-               {
-                       return received;
-               }
-       }
-
-       @TestConfiguration
-       static class Configuration
-       {
-               @Bean
-               Consumer consumer()
-               {
-                       return new Consumer();
-               }
-
-               @Primary
-               @Bean
-               KeyValueBytesStoreSupplier inMemoryStoreSupplier()
-               {
-                       return Stores.inMemoryKeyValueStore("TEST-STORE");
-               }
-       }
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
new file mode 100644 (file)
index 0000000..10085f5
--- /dev/null
@@ -0,0 +1,132 @@
+package de.juplo.kafka.wordcount.top10;
+
+import de.juplo.kafka.wordcount.counter.Word;
+import de.juplo.kafka.wordcount.counter.WordCounter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.time.Duration;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+
+@SpringBootTest(
+               properties = {
+                               "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+                               "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+                               "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.Word,counter:de.juplo.kafka.wordcount.counter.WordCounter",
+                               "spring.kafka.consumer.auto-offset-reset=earliest",
+                               "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+                               "spring.kafka.consumer.properties.spring.json.use.type.headers=false",
+                               "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.top10.RankingData",
+                               "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10   ",
+                               "logging.level.root=WARN",
+                               "logging.level.de.juplo=DEBUG",
+                               "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.top10.commit-interval=0",
+                               "juplo.wordcount.top10.cacheMaxBytes=0",
+                               "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN,
+                               "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT })
+@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT })
+@Slf4j
+public class Top10ApplicationIT
+{
+       public final static String TOPIC_IN = "in";
+       public final static String TOPIC_OUT = "out";
+
+       @Autowired
+       KafkaTemplate<Word, WordCounter> kafkaTemplate;
+       @Autowired
+       Consumer consumer;
+
+
+       @BeforeEach
+       public void clear()
+       {
+               consumer.received.clear();
+       }
+
+
+       @Test
+       void testSendMessage()
+       {
+               Stream
+                               .of(TestData.INPUT_MESSAGES)
+                               .forEach(kv ->
+                               {
+                                       try
+                                       {
+                                               SendResult<Word, WordCounter> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+                                               log.info(
+                                                               "Sent: {}={}, partition={}, offset={}",
+                                                               result.getProducerRecord().key(),
+                                                               result.getProducerRecord().value(),
+                                                               result.getRecordMetadata().partition(),
+                                                               result.getRecordMetadata().offset());
+                                       }
+                                       catch (Exception e)
+                                       {
+                                               throw new RuntimeException(e);
+                                       }
+                               });
+
+               await("Expexted converted data")
+                               .atMost(Duration.ofSeconds(10))
+                               .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages()));
+       }
+
+
+       static class Consumer
+       {
+               private final MultiValueMap<String, Ranking> received = new LinkedMultiValueMap<>();
+
+               @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
+               public synchronized void receive(
+                               @Header(KafkaHeaders.RECEIVED_KEY) String user,
+                               @Payload RankingData ranking)
+               {
+                       log.debug("Received message: {} -> {}", user, ranking);
+                       received.add(user, Ranking.of(ranking.entries));
+               }
+
+               synchronized MultiValueMap<String, Ranking> getReceivedMessages()
+               {
+                       return received;
+               }
+       }
+
+       @TestConfiguration
+       static class Configuration
+       {
+               @Bean
+               Consumer consumer()
+               {
+                       return new Consumer();
+               }
+
+               @Primary
+               @Bean
+               KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+               {
+                       return Stores.inMemoryKeyValueStore("TEST-STORE");
+               }
+       }
+}