popular: 1.0.0 - Renamed packages and classes -- ALIGN
authorKai Moritz <kai@juplo.de>
Sat, 15 Jun 2024 12:49:41 +0000 (14:49 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 15 Jun 2024 12:49:41 +0000 (14:49 +0200)
13 files changed:
src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java
src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java
src/main/java/de/juplo/kafka/wordcount/popular/PopularController.java
src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/popular/Ranking.java
src/main/java/de/juplo/kafka/wordcount/popular/Word.java
src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java
src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java
src/test/java/de/juplo/kafka/wordcount/counter/TestWordCounter.java
src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/popular/TestData.java

index eeee7eb..602f3f6 100644 (file)
@@ -1,14 +1,14 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
 
 @SpringBootApplication
-public class QueryApplication
+public class PopularApplication
 {
        public static void main(String[] args)
        {
-               SpringApplication.run(QueryApplication.class, args);
+               SpringApplication.run(PopularApplication.class, args);
        }
 }
index 440d5c4..1997d4e 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -21,20 +21,20 @@ import java.net.Socket;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.USER_STORE_NAME;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
 @Configuration
-@EnableConfigurationProperties(QueryApplicationProperties.class)
+@EnableConfigurationProperties(PopularApplicationProperties.class)
 @Slf4j
-public class QueryApplicationConfiguration
+public class PopularApplicationConfiguration
 {
        @Bean
        public HostInfo applicationServer(
                        ServerProperties serverProperties,
-                       QueryApplicationProperties applicationProperties) throws IOException
+                       PopularApplicationProperties applicationProperties) throws IOException
        {
                String host;
                if (serverProperties.getAddress() == null)
@@ -56,7 +56,7 @@ public class QueryApplicationConfiguration
 
        @Bean
        public Properties streamProcessorProperties(
-                       QueryApplicationProperties applicationProperties,
+                       PopularApplicationProperties applicationProperties,
                        HostInfo applicationServer)
        {
                Properties props = new Properties();
@@ -89,23 +89,22 @@ public class QueryApplicationConfiguration
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                props.put(
                                JsonDeserializer.TYPE_MAPPINGS,
-                               "user:" + Key.class.getName() + "," +
-                               "ranking:" + Ranking.class.getName() + "," +
-                               "userranking:" + UserRanking.class.getName());
+                               "word:" + Word.class.getName() + "," +
+                               "ranking:" + Ranking.class.getName());
 
                return props;
        }
 
        @Bean(initMethod = "start", destroyMethod = "stop")
-       public QueryStreamProcessor streamProcessor(
+       public PopularStreamProcessor streamProcessor(
                        Properties streamProcessorProperties,
                        HostInfo applicationServer,
-                       QueryApplicationProperties applicationProperties,
+                       PopularApplicationProperties applicationProperties,
                        KeyValueBytesStoreSupplier userStoreSupplier,
                        KeyValueBytesStoreSupplier rankingStoreSupplier,
                        ConfigurableApplicationContext context)
        {
-               QueryStreamProcessor streamProcessor = new QueryStreamProcessor(
+               PopularStreamProcessor streamProcessor = new PopularStreamProcessor(
                                streamProcessorProperties,
                                applicationServer,
                                applicationProperties.getUsersInputTopic(),
index 4a9eeca..eb1e0ca 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
 
 
 import lombok.Getter;
@@ -7,16 +7,15 @@ import lombok.ToString;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
 
-@ConfigurationProperties("juplo.wordcount.query")
+@ConfigurationProperties("juplo.wordcount.popular")
 @Getter
 @Setter
 @ToString
-public class QueryApplicationProperties
+public class PopularApplicationProperties
 {
   private String bootstrapServer = "localhost:9092";
-  private String applicationId = "query";
-  private String rankingInputTopic = "top10";
-  private String usersInputTopic = "users";
+  private String applicationId = "popular";
+  private String inputTopic = "countings";
   private Integer commitInterval;
   private Integer cacheMaxBytes;
 }
index a9b5b80..1a3f120 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -14,12 +14,12 @@ import java.util.Optional;
 
 @RestController
 @RequiredArgsConstructor
-public class QueryController
+public class PopularController
 {
-  private final QueryStreamProcessor processor;
+  private final PopularStreamProcessor processor;
 
   @GetMapping("{username}")
-  ResponseEntity<UserRanking> queryFor(@PathVariable String username)
+  ResponseEntity<Ranking> queryFor(@PathVariable String username)
   {
     Optional<URI> redirect = processor.getRedirect(username);
     if (redirect.isPresent())
@@ -33,7 +33,7 @@ public class QueryController
 
     try
     {
-      return ResponseEntity.of(processor.getUserRanking(username));
+      return ResponseEntity.of(processor.getRanking(username));
     }
     catch (InvalidStateStoreException e)
     {
index e075eb7..1c24e7a 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
 
 import jakarta.annotation.PostConstruct;
 import jakarta.annotation.PreDestroy;
@@ -18,17 +18,17 @@ import java.util.Properties;
 
 
 @Slf4j
-public class QueryStreamProcessor
+public class PopularStreamProcessor
 {
        public static final String USER_STORE_NAME = "users";
        public static final String RANKING_STORE_NAME = "rankings";
 
        public final KafkaStreams streams;
        public final HostInfo hostInfo;
-       public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRanking>> storeParameters;
+       public final StoreQueryParameters<ReadOnlyKeyValueStore<String, Ranking>> storeParameters;
 
 
-       public QueryStreamProcessor(
+       public PopularStreamProcessor(
                        Properties props,
                        HostInfo applicationServer,
                        String usersInputTopic,
@@ -43,7 +43,7 @@ public class QueryStreamProcessor
                                rankingStoreSupplier);
                streams = new KafkaStreams(topology, props);
                hostInfo = applicationServer;
-               storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());;
+               storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());
        }
 
        static Topology buildTopology(
@@ -64,8 +64,8 @@ public class QueryStreamProcessor
                                                                .withKeySerde(Serdes.String())
                                                                .withValueSerde(new JsonSerde().copyWithType(User.class)));
                KStream<String, Ranking> rankings = builder
-                               .<Key, Ranking>stream(rankingInputTopic)
-                               .map((key, value) -> new KeyValue<>(key.getUser(), value));
+                               .<Word, Ranking>stream(rankingInputTopic)
+                               .map((word, value) -> new KeyValue<>(word.getUser(), value));
 
                rankings
                                .join(users, (ranking, user) -> UserRanking.of(
@@ -85,7 +85,7 @@ public class QueryStreamProcessor
                return topology;
        }
 
-       ReadOnlyKeyValueStore<String, UserRanking> getStore()
+       ReadOnlyKeyValueStore<String, Ranking> getStore()
        {
                return streams.store(storeParameters);
        }
@@ -106,7 +106,7 @@ public class QueryStreamProcessor
                return Optional.of(location);
        }
 
-       public Optional<UserRanking> getUserRanking(String username)
+       public Optional<Ranking> getRanking(String username)
        {
                return Optional.ofNullable(getStore().get(username));
        }
index 8966be6..07a9323 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
 
 import lombok.Data;
 
@@ -6,5 +6,5 @@ import lombok.Data;
 @Data
 public class Ranking
 {
-  private Entry[] entries;
+  private WordCounter[] entries;
 }
index 57d095a..1b6bf32 100644 (file)
@@ -1,10 +1,10 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
 
 import lombok.Data;
 
 
 @Data
-public class Key
+public class Word
 {
   private String user;
 }
index 80b4daf..9199ec9 100644 (file)
@@ -1,10 +1,10 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
 
 import lombok.Data;
 
 
 @Data
-public class Entry
+public class WordCounter
 {
   private String word;
   private Long count;
index cc63c34..f899d8c 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.counter;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -8,7 +8,8 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor(staticName = "of")
 @NoArgsConstructor
 @Data
-public class TestUser
+public class TestWord
+
 {
   String user;
 }
index 215327f..ef4820f 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.counter;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor(staticName = "of")
 @NoArgsConstructor
 @Data
-public class TestEntry
+public class TestWordCounter
 {
   String word;
   long count;
index 1315eae..bf172e0 100644 (file)
@@ -1,8 +1,8 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.wordcount.top10.TestRanking;
-import de.juplo.kafka.wordcount.top10.TestUser;
+import de.juplo.kafka.wordcount.counter.TestRanking;
+import de.juplo.kafka.wordcount.counter.TestWord;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -30,8 +30,8 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.USER_STORE_NAME;
 import static org.awaitility.Awaitility.await;
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
@@ -46,12 +46,12 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
                                "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
                                "juplo.wordcount.query.commit-interval=100",
                                "juplo.wordcount.query.cache-max-bytes=0",
-                               "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
-                               "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
+                               "juplo.wordcount.query.users-input-topic=" + PopularApplicationIT.TOPIC_USERS,
+                               "juplo.wordcount.query.ranking-input-topic=" + PopularApplicationIT.TOPIC_TOP10 })
 @AutoConfigureMockMvc
-@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS})
+@EmbeddedKafka(topics = { PopularApplicationIT.TOPIC_TOP10, PopularApplicationIT.TOPIC_USERS})
 @Slf4j
-public class QueryApplicationIT
+public class PopularApplicationIT
 {
        public static final String TOPIC_TOP10 = "top10";
        public static final String TOPIC_USERS = "users";
@@ -62,7 +62,7 @@ public class QueryApplicationIT
        @Autowired
        ObjectMapper objectMapper;
        @Autowired
-       QueryStreamProcessor streamProcessor;
+       PopularStreamProcessor streamProcessor;
 
 
        @BeforeAll
@@ -153,7 +153,7 @@ public class QueryApplicationIT
                        Map<String, Object> properties = Map.of(
                                        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
                                        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
-                                       JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
+                                       JsonSerializer.TYPE_MAPPINGS, "user:" + TestWord.class.getName() + ",ranking:" + TestRanking.class.getName());
                        return new KafkaTemplate(producerFactory, properties);
                }
 
index 203c813..32fb0bf 100644 (file)
@@ -1,7 +1,7 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
 
-import de.juplo.kafka.wordcount.top10.TestRanking;
-import de.juplo.kafka.wordcount.top10.TestUser;
+import de.juplo.kafka.wordcount.counter.TestRanking;
+import de.juplo.kafka.wordcount.counter.TestWord;
 import de.juplo.kafka.wordcount.users.TestUserData;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -17,11 +17,11 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.util.Map;
 
-import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguration.serializationConfig;
 
 
 @Slf4j
-public class QueryStreamProcessorTopologyTest
+public class PopularStreamProcessorTopologyTest
 {
   public static final String TOP10_IN = "TOP10-IN";
   public static final String USERS_IN = "USERS-IN";
@@ -30,14 +30,14 @@ public class QueryStreamProcessorTopologyTest
 
 
   TopologyTestDriver testDriver;
-  TestInputTopic<TestUser, TestRanking> top10In;
+  TestInputTopic<TestWord, TestRanking> top10In;
   TestInputTopic<String, TestUserData> userIn;
 
 
   @BeforeEach
   public void setUp()
   {
-    Topology topology = QueryStreamProcessor.buildTopology(
+    Topology topology = PopularStreamProcessor.buildTopology(
         USERS_IN,
         TOP10_IN,
         Stores.inMemoryKeyValueStore(USERS_STORE_NAME),
@@ -47,7 +47,7 @@ public class QueryStreamProcessorTopologyTest
 
     top10In = testDriver.createInputTopic(
         TOP10_IN,
-        jsonSerializer(TestUser.class, true),
+        jsonSerializer(TestWord.class, true),
         jsonSerializer(TestRanking.class,false));
 
     userIn = testDriver.createInputTopic(
@@ -83,7 +83,7 @@ public class QueryStreamProcessorTopologyTest
     jsonSerializer.configure(
         Map.of(
             JsonSerializer.TYPE_MAPPINGS,
-            "user:" + TestUser.class.getName() + "," +
+            "user:" + TestWord.class.getName() + "," +
             "ranking:" + TestRanking.class.getName()),
         isKey);
     return jsonSerializer;
index 7c8b0b4..1bceecc 100644 (file)
@@ -1,8 +1,8 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
 
-import de.juplo.kafka.wordcount.top10.TestEntry;
-import de.juplo.kafka.wordcount.top10.TestRanking;
-import de.juplo.kafka.wordcount.top10.TestUser;
+import de.juplo.kafka.wordcount.counter.TestWordCounter;
+import de.juplo.kafka.wordcount.counter.TestRanking;
+import de.juplo.kafka.wordcount.counter.TestWord;
 import de.juplo.kafka.wordcount.users.TestUserData;
 import org.apache.kafka.streams.KeyValue;
 
@@ -15,10 +15,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class TestData
 {
-       static final TestUser PETER = TestUser.of("peter");
-       static final TestUser KLAUS = TestUser.of("klaus");
+       static final TestWord PETER = TestWord.of("peter");
+       static final TestWord KLAUS = TestWord.of("klaus");
 
-       static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
+       static final Stream<KeyValue<TestWord, TestRanking>> getTop10Messages()
        {
                return Stream.of(TOP10_MESSAGES);
        }
@@ -55,10 +55,10 @@ class TestData
                                .map(kv -> kv.value)
                                .reduce(null, (left, right) -> right);
 
-               Entry[] entries = Arrays
+               WordCounter[] entries = Arrays
                                .stream(testRanking.getEntries())
                                .map(testEntry -> entryOf(testEntry))
-                               .toArray(size -> new Entry[size]);
+                               .toArray(size -> new WordCounter[size]);
 
                return UserRanking.of(
                                testUserData.getFirstName(),
@@ -66,71 +66,71 @@ class TestData
                                entries);
        }
 
-       private static Entry entryOf(TestEntry testEntry)
+       private static WordCounter entryOf(TestWordCounter testEntry)
        {
-               Entry entry = new Entry();
-               entry.setWord(testEntry.getWord());
-               entry.setCount(testEntry.getCount());
-               return entry;
+               WordCounter wordCounter = new WordCounter();
+               wordCounter.setWord(testEntry.getWord());
+               wordCounter.setCount(testEntry.getCount());
+               return wordCounter;
        }
-       private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
+       private static KeyValue<TestWord, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair( // 0
                                        PETER,
                                        TestRanking.of(
-                                                       TestEntry.of("Hallo", 1l))),
+                                                       TestWordCounter.of("Hallo", 1l))),
                        KeyValue.pair( // 1
                                        KLAUS,
                                        TestRanking.of(
-                                                       TestEntry.of("Müsch", 1l))),
+                                                       TestWordCounter.of("Müsch", 1l))),
                        KeyValue.pair( // 2
                                        PETER,
                                        TestRanking.of(
-                                                       TestEntry.of("Hallo", 1l),
-                                                       TestEntry.of("Welt", 1l))),
+                                                       TestWordCounter.of("Hallo", 1l),
+                                                       TestWordCounter.of("Welt", 1l))),
                        KeyValue.pair( // 3
                                        KLAUS,
                                        TestRanking.of(
-                                                       TestEntry.of("Müsch", 2l))),
+                                                       TestWordCounter.of("Müsch", 2l))),
                        KeyValue.pair( // 4
                                        KLAUS,
                                        TestRanking.of(
-                                                       TestEntry.of("Müsch", 2l),
-                                                       TestEntry.of("s", 1l))),
+                                                       TestWordCounter.of("Müsch", 2l),
+                                                       TestWordCounter.of("s", 1l))),
                        KeyValue.pair( // 5
                                        PETER,
                                        TestRanking.of(
-                                                       TestEntry.of("Hallo", 1l),
-                                                       TestEntry.of("Welt", 1l),
-                                                       TestEntry.of("Boäh", 1l))),
+                                                       TestWordCounter.of("Hallo", 1l),
+                                                       TestWordCounter.of("Welt", 1l),
+                                                       TestWordCounter.of("Boäh", 1l))),
                        KeyValue.pair( // 6
                                        PETER,
                                        TestRanking.of(
-                                                       TestEntry.of("Welt", 2l),
-                                                       TestEntry.of("Hallo", 1l),
-                                                       TestEntry.of("Boäh", 1l))),
+                                                       TestWordCounter.of("Welt", 2l),
+                                                       TestWordCounter.of("Hallo", 1l),
+                                                       TestWordCounter.of("Boäh", 1l))),
                        KeyValue.pair( // 7
                                        PETER,
                                        TestRanking.of(
-                                                       TestEntry.of("Welt", 2l),
-                                                       TestEntry.of("Boäh", 2l),
-                                                       TestEntry.of("Hallo", 1l))),
+                                                       TestWordCounter.of("Welt", 2l),
+                                                       TestWordCounter.of("Boäh", 2l),
+                                                       TestWordCounter.of("Hallo", 1l))),
                        KeyValue.pair( // 8
                                        KLAUS,
                                        TestRanking.of(
-                                                       TestEntry.of("Müsch", 2l),
-                                                       TestEntry.of("s", 2l))),
+                                                       TestWordCounter.of("Müsch", 2l),
+                                                       TestWordCounter.of("s", 2l))),
                        KeyValue.pair( // 9
                                        PETER,
                                        TestRanking.of(
-                                                       TestEntry.of("Boäh", 3l),
-                                                       TestEntry.of("Welt", 2l),
-                                                       TestEntry.of("Hallo", 1l))),
+                                                       TestWordCounter.of("Boäh", 3l),
+                                                       TestWordCounter.of("Welt", 2l),
+                                                       TestWordCounter.of("Hallo", 1l))),
                        KeyValue.pair( // 10
                                        KLAUS,
                                        TestRanking.of(
-                                                       TestEntry.of("s", 3l),
-                                                       TestEntry.of("Müsch", 2l))),
+                                                       TestWordCounter.of("s", 3l),
+                                                       TestWordCounter.of("Müsch", 2l))),
        };
 
        private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]