From 2f5828ee2bbd662c3c81c76961d00b871468c8b9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 11 Jun 2024 20:36:58 +0200 Subject: [PATCH 01/16] query: 2.0.0 - Values are serialized as JSON -- works, but is very confusing * The default-type is specified as a consumption-parameter in the command, that reads the input topic into the `KTable` via ``Consumed.with(..)``. * The resulting code is confusing, because the ``Consumed``-parameter is used for both, the consumption of the input topic _and_ the consumption of stored values, if read from the state-store. * Because of this, one might only think of the consumption of the stored values from the state-store, when looking at the ``Consumed.with()``- statement, and argue, why the type-mappings have to be specified here. --- .../query/QueryApplicationConfiguration.java | 1 - .../wordcount/query/QueryStreamProcessor.java | 15 ++++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 7da1712..07b78e4 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -80,7 +80,6 @@ public class QueryApplicationConfiguration props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, User.class.getName()); // << Does not work without this! props.put( JsonDeserializer.TYPE_MAPPINGS, "user:" + Key.class.getName() + "," + diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index cc65fce..0692652 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -13,9 +13,11 @@ import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerde; import java.net.URI; +import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -53,7 +55,18 @@ public class QueryStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - KTable users = builder.table(usersInputTopic); + JsonSerde valueSerde = new JsonSerde(); + valueSerde.configure(Map.of( + JsonDeserializer.TYPE_MAPPINGS, + "user:" + Key.class.getName() + "," + + "ranking:" + Ranking.class.getName() + "," + + "userdata:" + User.class.getName() + "," + + "userranking:" + UserRanking.class.getName() + ), false); + KTable users = builder.table( + usersInputTopic, + Consumed.with(null, valueSerde.copyWithType(User.class)) + ); KStream rankings = builder.stream(rankingInputTopic); rankings -- 2.20.1 From 0648885ec026d7434561060dc7edb703efea6853 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 11 Jun 2024 20:41:30 +0200 Subject: [PATCH 02/16] query: 2.0.0 - Values are serialized as JSON -- works, still startling, but explainable * Splitting the command ``table()``, that reads the input-topic and materializes it as ``KTable()``, into the two statements ``stream()``, that reads the input-topic into a ``KStream``, and ``toTable()``, that turns the `KStream` into a ``KTable``, the ``JsonSerde``, that is specified via ``Consumed.with(..)``, is only used for the serialization and deserialization concerning the ``KTable`` -- not the deserialization of the values, that are read from the input-topic. * Hence, the type-mappings does not have to be specified for the ``JsonSerde``, resulting in better understandable code. * __Note__, that the resulting topology does not differe, because the DSL is able to combine the effects of the two statements. --- .../wordcount/query/QueryStreamProcessor.java | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 0692652..7dacd4b 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -5,7 +5,6 @@ import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -13,11 +12,9 @@ import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerde; import java.net.URI; -import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -55,18 +52,9 @@ public class QueryStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - JsonSerde valueSerde = new JsonSerde(); - valueSerde.configure(Map.of( - JsonDeserializer.TYPE_MAPPINGS, - "user:" + Key.class.getName() + "," + - "ranking:" + Ranking.class.getName() + "," + - "userdata:" + User.class.getName() + "," + - "userranking:" + UserRanking.class.getName() - ), false); - KTable users = builder.table( - usersInputTopic, - Consumed.with(null, valueSerde.copyWithType(User.class)) - ); + KTable users = builder + .stream(usersInputTopic) + .toTable(Materialized.with(null, new JsonSerde().copyWithType(User.class))); KStream rankings = builder.stream(rankingInputTopic); rankings -- 2.20.1 From fc5d6c6ee08a4b2e29a045bf4071dd0a4d86bc0d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 12 Jun 2024 22:46:24 +0200 Subject: [PATCH 03/16] query: 2.0.0 - Defined 2 state-stores (all state in-memory in tests) * Introduced a second state-store to store the incomming users-table. * Without the explicit definition of the state-store, it is _not_ possible, to reconfigure the integration-test in such a way, taht it does not store its state locally on disk. --- .../query/QueryApplicationConfiguration.java | 19 +++++++++++---- .../wordcount/query/QueryStreamProcessor.java | 24 ++++++++++++------- .../wordcount/query/QueryApplicationIT.java | 16 +++++++++---- .../QueryStreamProcessorTopologyTest.java | 8 ++++--- 4 files changed, 46 insertions(+), 21 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 07b78e4..3bf8326 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -22,7 +22,8 @@ import java.net.Socket; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -95,7 +96,8 @@ public class QueryApplicationConfiguration Properties streamProcessorProperties, HostInfo applicationServer, QueryApplicationProperties applicationProperties, - KeyValueBytesStoreSupplier storeSupplier, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier, ConfigurableApplicationContext context) { QueryStreamProcessor streamProcessor = new QueryStreamProcessor( @@ -103,7 +105,8 @@ public class QueryApplicationConfiguration applicationServer, applicationProperties.getUsersInputTopic(), applicationProperties.getRankingInputTopic(), - storeSupplier); + userStoreSupplier, + rankingStoreSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { @@ -120,8 +123,14 @@ public class QueryApplicationConfiguration } @Bean - public KeyValueBytesStoreSupplier storeSupplier() + public KeyValueBytesStoreSupplier userStoreSupplier() { - return Stores.persistentKeyValueStore(STORE_NAME); + return Stores.persistentKeyValueStore(USER_STORE_NAME); + } + + @Bean + public KeyValueBytesStoreSupplier rankingStoreSupplier() + { + return Stores.persistentKeyValueStore(RANKING_STORE_NAME); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 7dacd4b..4749264 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -22,7 +22,8 @@ import java.util.Properties; @Slf4j public class QueryStreamProcessor { - public static final String STORE_NAME = "rankings-by-username"; + public static final String USER_STORE_NAME = "users"; + public static final String RANKING_STORE_NAME = "rankings"; public final KafkaStreams streams; public final HostInfo hostInfo; @@ -34,27 +35,34 @@ public class QueryStreamProcessor HostInfo applicationServer, String usersInputTopic, String rankingInputTopic, - KeyValueBytesStoreSupplier storeSupplier) + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier) { Topology topology = buildTopology( usersInputTopic, rankingInputTopic, - storeSupplier); + userStoreSupplier, + rankingStoreSupplier); streams = new KafkaStreams(topology, props); hostInfo = applicationServer; - storeParameters = StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());; + storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());; } static Topology buildTopology( String usersInputTopic, String rankingInputTopic, - KeyValueBytesStoreSupplier storeSupplier) + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier) { StreamsBuilder builder = new StreamsBuilder(); KTable users = builder .stream(usersInputTopic) - .toTable(Materialized.with(null, new JsonSerde().copyWithType(User.class))); + .toTable( + Materialized + .as(userStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(new JsonSerde().copyWithType(User.class))); KStream rankings = builder.stream(rankingInputTopic); rankings @@ -64,7 +72,7 @@ public class QueryStreamProcessor ranking.getEntries())) .toTable( Materialized - .as(storeSupplier) + .as(rankingStoreSupplier) .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); Topology topology = builder.build(); @@ -80,7 +88,7 @@ public class QueryStreamProcessor public Optional getRedirect(String username) { - KeyQueryMetadata metadata = streams.queryMetadataForKey(STORE_NAME, username, Serdes.String().serializer()); + KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer()); HostInfo activeHost = metadata.activeHost(); log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port()); diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index a9cca10..d800fbd 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -12,7 +12,6 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; import org.springframework.http.MediaType; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; @@ -24,13 +23,15 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @SpringBootTest( properties = { + "spring.main.allow-bean-definition-overriding=true", "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking", @@ -129,11 +130,16 @@ public class QueryApplicationIT @TestConfiguration static class Configuration { - @Primary @Bean - KeyValueBytesStoreSupplier inMemoryStoreSupplier() + KeyValueBytesStoreSupplier userStoreSupplier() { - return Stores.inMemoryKeyValueStore(STORE_NAME); + return Stores.inMemoryKeyValueStore(USER_STORE_NAME); + } + + @Bean + KeyValueBytesStoreSupplier rankingStoreSupplier() + { + return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME); } } } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index 845792c..1a857b7 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -24,7 +24,8 @@ public class QueryStreamProcessorTopologyTest { public static final String TOP10_IN = "TOP10-IN"; public static final String USERS_IN = "USERS-IN"; - public static final String STORE_NAME = "TOPOLOGY-TEST"; + public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS"; + public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS"; TopologyTestDriver testDriver; @@ -38,7 +39,8 @@ public class QueryStreamProcessorTopologyTest Topology topology = QueryStreamProcessor.buildTopology( USERS_IN, TOP10_IN, - Stores.inMemoryKeyValueStore(STORE_NAME)); + Stores.inMemoryKeyValueStore(USERS_STORE_NAME), + Stores.inMemoryKeyValueStore(RANKING_STORE_NAME)); testDriver = new TopologyTestDriver(topology, serializationConfig()); @@ -64,7 +66,7 @@ public class QueryStreamProcessorTopologyTest .getTop10Messages() .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + KeyValueStore store = testDriver.getKeyValueStore(RANKING_STORE_NAME); TestData.assertExpectedState(user -> store.get(user)); } -- 2.20.1 From f18423d411650c6f08c9b698b92c33c42bdd670f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 12 Jun 2024 22:46:24 +0200 Subject: [PATCH 04/16] query: 2.0.0 - Configured caching & commit-interval in integration-test * Introduced configuration-parameters for caching and the commit-interval. * Explicitly turned of caching in the integration-test. * Explicitly set the commit-interval to a very short period (100ms) in the integration-test. --- .../wordcount/query/QueryApplicationConfiguration.java | 7 +++++++ .../kafka/wordcount/query/QueryApplicationProperties.java | 2 ++ .../de/juplo/kafka/wordcount/query/QueryApplicationIT.java | 2 ++ 3 files changed, 11 insertions(+) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 3bf8326..2ece744 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -70,6 +70,13 @@ public class QueryApplicationConfiguration props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer()); + if (applicationProperties.getCommitInterval() != null) + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval()); + if (applicationProperties.getCacheMaxBytes() != null) + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java index df5f41e..4a9eeca 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java @@ -17,4 +17,6 @@ public class QueryApplicationProperties private String applicationId = "query"; private String rankingInputTopic = "top10"; private String usersInputTopic = "users"; + private Integer commitInterval; + private Integer cacheMaxBytes; } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index d800fbd..58a1206 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -40,6 +40,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. "logging.level.org.apache.kafka.clients=INFO", "logging.level.org.apache.kafka.streams=INFO", "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.query.commit-interval=100", + "juplo.wordcount.query.cache-max-bytes=0", "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS, "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 }) @AutoConfigureMockMvc -- 2.20.1 From 2cc9929cfdccc6974a4c66b27b939fc6c74905e4 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 11 Jun 2024 22:53:11 +0200 Subject: [PATCH 05/16] query: 2.0.0 - Refined the Lombok-Annotations for the model-classes --- .../java/de/juplo/kafka/wordcount/query/Ranking.java | 10 ++-------- src/main/java/de/juplo/kafka/wordcount/query/User.java | 8 ++------ 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java index 69ae3aa..8966be6 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java @@ -1,15 +1,9 @@ package de.juplo.kafka.wordcount.query; -import lombok.Getter; -import lombok.Setter; +import lombok.Data; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; - -@Getter -@Setter +@Data public class Ranking { private Entry[] entries; diff --git a/src/main/java/de/juplo/kafka/wordcount/query/User.java b/src/main/java/de/juplo/kafka/wordcount/query/User.java index fdc0a33..f62b475 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/User.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/User.java @@ -1,14 +1,10 @@ package de.juplo.kafka.wordcount.query; +import lombok.Data; import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -@Getter -@Setter -@ToString +@Data @EqualsAndHashCode(of = "username") public class User { -- 2.20.1 From cc8f4fcf721bcdd4129fa1c3dde6c168d90ac183 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 12 Jun 2024 23:08:57 +0200 Subject: [PATCH 06/16] query: 2.0.0 - `QueryApplicationIT` uses two ``KafkaTemplate``s * Switched the setup of the `QueryApplicationIT` to use two different instances of `KafkaTemplate` for the two input-topics. * This is a preparation for the introduction of the typed JSON-keys. --- .../wordcount/query/QueryApplicationIT.java | 41 +++++++++++++++---- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 58a1206..5eb4706 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -1,7 +1,11 @@ package de.juplo.kafka.wordcount.query; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.users.TestUserData; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.BeforeAll; @@ -14,13 +18,16 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.http.MediaType; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Map; import java.util.concurrent.CompletableFuture; import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; @@ -32,9 +39,6 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { "spring.main.allow-bean-definition-overriding=true", - "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", - "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "logging.level.org.apache.kafka.clients=INFO", @@ -63,21 +67,22 @@ public class QueryApplicationIT @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate usersKafkaTemplate, + @Autowired KafkaTemplate top10KafkaTemplate) { TestData .getUsersMessages() - .forEach(kv -> flush(kafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); + .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); TestData .getTop10Messages() - .forEach(kv -> flush(kafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); + .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); } - private static void flush(CompletableFuture> future) + private static void flush(CompletableFuture future) { try { - SendResult result = future.get(); + SendResult result = future.get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), @@ -132,6 +137,26 @@ public class QueryApplicationIT @TestConfiguration static class Configuration { + @Bean + KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory) + { + Map properties = Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + JsonSerializer.TYPE_MAPPINGS, "userdata:" + TestUserData.class.getName()); + return new KafkaTemplate(producerFactory, properties); + } + + @Bean + KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory) + { + Map properties = Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName()); + return new KafkaTemplate(producerFactory, properties); + } + @Bean KeyValueBytesStoreSupplier userStoreSupplier() { -- 2.20.1 From a07e20011b1a22d120920e84c36683a9d42c3ac5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 12 Jun 2024 23:08:57 +0200 Subject: [PATCH 07/16] query: 2.0.0 - (RED) Corrected expectations for the ``users``-input-topic * The messages that are written by the `users` service doese _not_ contain any type-information. * This commits corrects the corresponding expectations in the test-cases. * *RED:* The tests fail, because the implementation was not yet fixed! --- .../de/juplo/kafka/wordcount/query/QueryApplicationIT.java | 3 +-- .../wordcount/query/QueryStreamProcessorTopologyTest.java | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 5eb4706..19ada51 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.query; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.wordcount.top10.TestRanking; -import de.juplo.kafka.wordcount.users.TestUserData; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; @@ -143,7 +142,7 @@ public class QueryApplicationIT Map properties = Map.of( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - JsonSerializer.TYPE_MAPPINGS, "userdata:" + TestUserData.class.getName()); + JsonSerializer.ADD_TYPE_INFO_HEADERS, false); return new KafkaTemplate(producerFactory, properties); } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index 1a857b7..fda7408 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -52,7 +52,7 @@ public class QueryStreamProcessorTopologyTest userIn = testDriver.createInputTopic( USERS_IN, new StringSerializer(), - jsonSerializer(TestUserData.class)); + jsonSerializer(TestUserData.class).noTypeInfo()); } @@ -82,7 +82,6 @@ public class QueryStreamProcessorTopologyTest jsonSerializer.configure( Map.of( JsonSerializer.TYPE_MAPPINGS, - "userdata:" + TestUserData.class.getName() + "," + "ranking:" + TestRanking.class.getName()), false); return jsonSerializer; -- 2.20.1 From 913c2c4ec0a3584f6be27c8341898d17c4260501 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 12 Jun 2024 23:08:57 +0200 Subject: [PATCH 08/16] query: 2.0.0 - (GREEN) Adjusted the implementation to the new expectations * Messages from the incomming topic, that is written by the `users` service can be serialized, although no type-information is conveyed via the headers. --- .../kafka/wordcount/query/QueryApplicationConfiguration.java | 1 - .../de/juplo/kafka/wordcount/query/QueryStreamProcessor.java | 5 ++++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 2ece744..6c7844d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -92,7 +92,6 @@ public class QueryApplicationConfiguration JsonDeserializer.TYPE_MAPPINGS, "user:" + Key.class.getName() + "," + "ranking:" + Ranking.class.getName() + "," + - "userdata:" + User.class.getName() + "," + "userranking:" + UserRanking.class.getName()); return props; diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 4749264..dcb1234 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -5,6 +5,7 @@ import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -57,7 +58,9 @@ public class QueryStreamProcessor StreamsBuilder builder = new StreamsBuilder(); KTable users = builder - .stream(usersInputTopic) + .stream( + usersInputTopic, + Consumed.with(null, new JsonSerde().copyWithType(User.class))) .toTable( Materialized .as(userStoreSupplier) -- 2.20.1 From 700f80444d14b201f7b696fb5b7bcab0d767f007 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 14 Jun 2024 20:39:41 +0200 Subject: [PATCH 09/16] query: 2.0.0 - (RED) The keys of the top10-topic are deserialized as JSON * The seemingly straightforward change leds to a very strange and inconsisten error-situation. * Only the integration-test fails, while the topology-test works as originally expected. * The cause of the error is a missing serde-config for the key of the ``rankings``-``KStream``, which defies easy explanation. * The best explanation is, that the ``map()``-operation - despite possibly changing the type of the key and/or value - does not by itself define a parameter for specifing a corresponding serialization-config. * The reason for this is, that the operation does not define the complete operation by itself. * In order to take effect, it has to be combined with a second operation, that actually creates the outgoing topic. * Without that second DSL-operation, `map()` simply would yield no action. * And that is, why the serialization has to be defined on that second operation and cannot be defined on `map()` itself. * But the really strange thing about the error is, that it _only_ shows up in `QueryApplicationIT`. * It does not show in `QueryStreamProcessorTopologyTest` _and_ it does _not_ show up, if the application is compiled and started in the docker-setup. * One possible explanation for this wired behaviour might be a bug or misconception in the interpretation of the beforehand build topology, that leads to a non-deterministic behaviour. * Another possible explanation might be subtle differences in the internal caching behaviour -- but that seems unlikely, because tests, that are based on the `TopologyTestDriver` do not cache and are very (on the oposit) very handy if one wants to reveal bugs concerning the serialization and and running the application with the caching settings from the IT does not show the error. --- .../query/QueryApplicationConfiguration.java | 3 +-- .../wordcount/query/QueryStreamProcessor.java | 7 ++++-- .../wordcount/query/QueryApplicationIT.java | 5 ++-- .../QueryStreamProcessorTopologyTest.java | 14 ++++++----- .../juplo/kafka/wordcount/query/TestData.java | 24 +++++++++---------- .../juplo/kafka/wordcount/top10/TestUser.java | 14 +++++++++++ 6 files changed, 43 insertions(+), 24 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 6c7844d..440d5c4 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.query; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; @@ -86,7 +85,7 @@ public class QueryApplicationConfiguration { Properties props = new Properties(); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put( JsonDeserializer.TYPE_MAPPINGS, diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index dcb1234..bf27e2d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -60,13 +60,15 @@ public class QueryStreamProcessor KTable users = builder .stream( usersInputTopic, - Consumed.with(null, new JsonSerde().copyWithType(User.class))) + Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class))) .toTable( Materialized .as(userStoreSupplier) .withKeySerde(Serdes.String()) .withValueSerde(new JsonSerde().copyWithType(User.class))); - KStream rankings = builder.stream(rankingInputTopic); + KStream rankings = builder + .stream(rankingInputTopic) + .map((key, value) -> new KeyValue<>(key.getUsername(), value)); rankings .join(users, (ranking, user) -> UserRanking.of( @@ -76,6 +78,7 @@ public class QueryStreamProcessor .toTable( Materialized .as(rankingStoreSupplier) + .withKeySerde(Serdes.String()) .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); Topology topology = builder.build(); diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 19ada51..1315eae 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.query; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; @@ -150,9 +151,9 @@ public class QueryApplicationIT KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory) { Map properties = Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName()); + JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName()); return new KafkaTemplate(producerFactory, properties); } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index fda7408..203c813 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -1,6 +1,7 @@ package de.juplo.kafka.wordcount.query; import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; import de.juplo.kafka.wordcount.users.TestUserData; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.StringSerializer; @@ -29,7 +30,7 @@ public class QueryStreamProcessorTopologyTest TopologyTestDriver testDriver; - TestInputTopic top10In; + TestInputTopic top10In; TestInputTopic userIn; @@ -46,13 +47,13 @@ public class QueryStreamProcessorTopologyTest top10In = testDriver.createInputTopic( TOP10_IN, - new StringSerializer(), - jsonSerializer(TestRanking.class)); + jsonSerializer(TestUser.class, true), + jsonSerializer(TestRanking.class,false)); userIn = testDriver.createInputTopic( USERS_IN, new StringSerializer(), - jsonSerializer(TestUserData.class).noTypeInfo()); + jsonSerializer(TestUserData.class, false).noTypeInfo()); } @@ -76,14 +77,15 @@ public class QueryStreamProcessorTopologyTest testDriver.close(); } - private JsonSerializer jsonSerializer(Class type) + private JsonSerializer jsonSerializer(Class type, boolean isKey) { JsonSerializer jsonSerializer = new JsonSerializer<>(); jsonSerializer.configure( Map.of( JsonSerializer.TYPE_MAPPINGS, + "user:" + TestUser.class.getName() + "," + "ranking:" + TestRanking.class.getName()), - false); + isKey); return jsonSerializer; } } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java index 1fe34d9..c190eed 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.query; import de.juplo.kafka.wordcount.top10.TestEntry; import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; import de.juplo.kafka.wordcount.users.TestUserData; import org.apache.kafka.streams.KeyValue; @@ -14,10 +15,10 @@ import static org.assertj.core.api.Assertions.assertThat; class TestData { - static final String PETER = "peter"; - static final String KLAUS = "klaus"; + static final TestUser PETER = TestUser.of("peter"); + static final TestUser KLAUS = TestUser.of("klaus"); - static final Stream> getTop10Messages() + static final Stream> getTop10Messages() { return Stream.of(TOP10_MESSAGES); } @@ -29,8 +30,8 @@ class TestData static void assertExpectedState(Function function) { - assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER)); - assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS)); + assertRankingEqualsRankingFromLastMessage(PETER.getUsername(), function.apply(PETER.getUsername())); + assertRankingEqualsRankingFromLastMessage(KLAUS.getUsername(), function.apply(KLAUS.getUsername())); } private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson) @@ -41,7 +42,7 @@ class TestData private static UserRanking getLastMessageFor(String user) { return getTop10Messages() - .filter(kv -> kv.key.equals(user)) + .filter(kv -> kv.key.getUsername().equals(user)) .map(kv -> kv.value) .map(testRanking -> userRankingFor(user, testRanking)) .reduce(null, (left, right) -> right); @@ -72,8 +73,7 @@ class TestData entry.setCount(testEntry.getCount()); return entry; } - - private static KeyValue[] TOP10_MESSAGES = new KeyValue[] + private static KeyValue[] TOP10_MESSAGES = new KeyValue[] { KeyValue.pair( // 0 PETER, @@ -136,10 +136,10 @@ class TestData private static KeyValue[] USERS_MESSAGES = new KeyValue[] { KeyValue.pair( - PETER, - TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)), + PETER.getUsername(), + TestUserData.of(PETER.getUsername(), "Peter", "Pan", TestUserData.Sex.MALE)), KeyValue.pair( - KLAUS, - TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)), + KLAUS.getUsername(), + TestUserData.of(KLAUS.getUsername(), "Klaus", "Klüse", TestUserData.Sex.OTHER)), }; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java new file mode 100644 index 0000000..cc48496 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestUser +{ + String username; +} -- 2.20.1 From f929a47a66a2c14a291133aaa624ad9c5696bd53 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 15 Jun 2024 07:34:30 +0200 Subject: [PATCH 10/16] query: 2.0.0 - (GREEN) Explicitly specifed the missing serde-config --- .../juplo/kafka/wordcount/query/QueryStreamProcessor.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index bf27e2d..3a1665f 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -5,10 +5,7 @@ import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; @@ -74,7 +71,8 @@ public class QueryStreamProcessor .join(users, (ranking, user) -> UserRanking.of( user.getFirstName(), user.getLastName(), - ranking.getEntries())) + ranking.getEntries()), + Joined.keySerde(Serdes.String())) .toTable( Materialized .as(rankingStoreSupplier) -- 2.20.1 From 035b1f29fc38866e699452aeabda9675cef24c82 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 16 Jun 2024 21:35:58 +0200 Subject: [PATCH 11/16] query: 2.0.1 - (RED) Corrected expectations for JSON in input-messages --- pom.xml | 2 +- .../de/juplo/kafka/wordcount/query/TestData.java | 16 ++++++++-------- .../juplo/kafka/wordcount/top10/TestEntry.java | 2 +- .../de/juplo/kafka/wordcount/top10/TestUser.java | 2 +- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pom.xml b/pom.xml index 3edc6d3..707d82d 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount query - 2.0.0 + 2.0.1 Wordcount-Query Query stream-processor of the multi-user wordcount-example diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java index c190eed..cf58173 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -30,8 +30,8 @@ class TestData static void assertExpectedState(Function function) { - assertRankingEqualsRankingFromLastMessage(PETER.getUsername(), function.apply(PETER.getUsername())); - assertRankingEqualsRankingFromLastMessage(KLAUS.getUsername(), function.apply(KLAUS.getUsername())); + assertRankingEqualsRankingFromLastMessage(PETER.getUser(), function.apply(PETER.getUser())); + assertRankingEqualsRankingFromLastMessage(KLAUS.getUser(), function.apply(KLAUS.getUser())); } private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson) @@ -42,7 +42,7 @@ class TestData private static UserRanking getLastMessageFor(String user) { return getTop10Messages() - .filter(kv -> kv.key.getUsername().equals(user)) + .filter(kv -> kv.key.getUser().equals(user)) .map(kv -> kv.value) .map(testRanking -> userRankingFor(user, testRanking)) .reduce(null, (left, right) -> right); @@ -70,7 +70,7 @@ class TestData { Entry entry = new Entry(); entry.setWord(testEntry.getWord()); - entry.setCount(testEntry.getCount()); + entry.setCount(testEntry.getCounter()); return entry; } private static KeyValue[] TOP10_MESSAGES = new KeyValue[] @@ -136,10 +136,10 @@ class TestData private static KeyValue[] USERS_MESSAGES = new KeyValue[] { KeyValue.pair( - PETER.getUsername(), - TestUserData.of(PETER.getUsername(), "Peter", "Pan", TestUserData.Sex.MALE)), + PETER.getUser(), + TestUserData.of(PETER.getUser(), "Peter", "Pan", TestUserData.Sex.MALE)), KeyValue.pair( - KLAUS.getUsername(), - TestUserData.of(KLAUS.getUsername(), "Klaus", "Klüse", TestUserData.Sex.OTHER)), + KLAUS.getUser(), + TestUserData.of(KLAUS.getUser(), "Klaus", "Klüse", TestUserData.Sex.OTHER)), }; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java index 215327f..ceafa82 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java @@ -11,5 +11,5 @@ import lombok.NoArgsConstructor; public class TestEntry { String word; - long count; + long counter; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java index cc48496..cc63c34 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java @@ -10,5 +10,5 @@ import lombok.NoArgsConstructor; @Data public class TestUser { - String username; + String user; } -- 2.20.1 From b52ff02b6952dfc708c7c7d94205b5b24afd68f9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 16 Jun 2024 21:38:51 +0200 Subject: [PATCH 12/16] query: 2.0.1 - (GREEN) Adjusted implementation to corrected expectations --- src/main/java/de/juplo/kafka/wordcount/query/Entry.java | 2 +- src/main/java/de/juplo/kafka/wordcount/query/Key.java | 3 +-- .../de/juplo/kafka/wordcount/query/QueryStreamProcessor.java | 2 +- src/test/java/de/juplo/kafka/wordcount/query/TestData.java | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java index 80b4daf..4be314c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java @@ -7,5 +7,5 @@ import lombok.Data; public class Entry { private String word; - private Long count; + private Long counter; } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Key.java b/src/main/java/de/juplo/kafka/wordcount/query/Key.java index afeac4a..57d095a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Key.java @@ -6,6 +6,5 @@ import lombok.Data; @Data public class Key { - private String username; - private String word; + private String user; } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 3a1665f..e075eb7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -65,7 +65,7 @@ public class QueryStreamProcessor .withValueSerde(new JsonSerde().copyWithType(User.class))); KStream rankings = builder .stream(rankingInputTopic) - .map((key, value) -> new KeyValue<>(key.getUsername(), value)); + .map((key, value) -> new KeyValue<>(key.getUser(), value)); rankings .join(users, (ranking, user) -> UserRanking.of( diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java index cf58173..f5b8a00 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -70,7 +70,7 @@ class TestData { Entry entry = new Entry(); entry.setWord(testEntry.getWord()); - entry.setCount(testEntry.getCounter()); + entry.setCounter(testEntry.getCounter()); return entry; } private static KeyValue[] TOP10_MESSAGES = new KeyValue[] -- 2.20.1 From d835b70fc4d49a50f42da9c21b3de1dbcd18cbaf Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 22 Jun 2024 19:05:45 +0200 Subject: [PATCH 13/16] query: 2.1.0 - Refined input JSON (adapted to general format for stats) * Adapted the app to the new type-mapping `stats` for the incomming keys. * Refined the class `Key`, that defines the JSON for the incomming key. ** Renamed attribute `user` to `channel`. ** Added attribute `type` of type `String`. * Refined the class `Entry`, that defines the JSON of an entry in the ranking, that is defined in the class `Ranking`. ** Renamed attribute `word` to `key`. * The `QueryStreamProcessor` filters the incomming messages by the field `type` of the `Key`: all messages are dropped, that are not of type `COUNTER`. --- pom.xml | 2 +- .../de/juplo/kafka/wordcount/query/Entry.java | 2 +- .../de/juplo/kafka/wordcount/query/Key.java | 3 ++- .../query/QueryApplicationConfiguration.java | 2 +- .../wordcount/query/QueryStreamProcessor.java | 4 +++- .../wordcount/query/QueryApplicationIT.java | 2 +- .../QueryStreamProcessorTopologyTest.java | 2 +- .../juplo/kafka/wordcount/query/TestData.java | 21 ++++++++++--------- .../kafka/wordcount/top10/TestEntry.java | 2 +- .../juplo/kafka/wordcount/top10/TestUser.java | 3 ++- 10 files changed, 24 insertions(+), 19 deletions(-) diff --git a/pom.xml b/pom.xml index 707d82d..ff775f7 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount query - 2.0.1 + 2.1.0 Wordcount-Query Query stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java index 4be314c..383b1a6 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java @@ -6,6 +6,6 @@ import lombok.Data; @Data public class Entry { - private String word; + private String key; private Long counter; } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Key.java b/src/main/java/de/juplo/kafka/wordcount/query/Key.java index 57d095a..a2d85a1 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Key.java @@ -6,5 +6,6 @@ import lombok.Data; @Data public class Key { - private String user; + private String type; + private String channel; } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 440d5c4..0f9cad1 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -89,7 +89,7 @@ public class QueryApplicationConfiguration props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put( JsonDeserializer.TYPE_MAPPINGS, - "user:" + Key.class.getName() + "," + + "stats:" + Key.class.getName() + "," + "ranking:" + Ranking.class.getName() + "," + "userranking:" + UserRanking.class.getName()); diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index e075eb7..5543a91 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -20,6 +20,7 @@ import java.util.Properties; @Slf4j public class QueryStreamProcessor { + public static final String STATS_TYPE = "COUNTER"; public static final String USER_STORE_NAME = "users"; public static final String RANKING_STORE_NAME = "rankings"; @@ -65,7 +66,8 @@ public class QueryStreamProcessor .withValueSerde(new JsonSerde().copyWithType(User.class))); KStream rankings = builder .stream(rankingInputTopic) - .map((key, value) -> new KeyValue<>(key.getUser(), value)); + .filter((key, value) -> STATS_TYPE.equals(key.getType())) + .map((key, value) -> new KeyValue<>(key.getChannel(), value)); rankings .join(users, (ranking, user) -> UserRanking.of( diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 1315eae..fb12aee 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -153,7 +153,7 @@ public class QueryApplicationIT Map properties = Map.of( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName()); + JsonSerializer.TYPE_MAPPINGS, "stats:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName()); return new KafkaTemplate(producerFactory, properties); } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index 203c813..fbeb19b 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -83,7 +83,7 @@ public class QueryStreamProcessorTopologyTest jsonSerializer.configure( Map.of( JsonSerializer.TYPE_MAPPINGS, - "user:" + TestUser.class.getName() + "," + + "stats:" + TestUser.class.getName() + "," + "ranking:" + TestRanking.class.getName()), isKey); return jsonSerializer; diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java index f5b8a00..44162a0 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -10,13 +10,14 @@ import java.util.Arrays; import java.util.function.Function; import java.util.stream.Stream; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STATS_TYPE; import static org.assertj.core.api.Assertions.assertThat; class TestData { - static final TestUser PETER = TestUser.of("peter"); - static final TestUser KLAUS = TestUser.of("klaus"); + static final TestUser PETER = TestUser.of(STATS_TYPE, "peter"); + static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus"); static final Stream> getTop10Messages() { @@ -30,8 +31,8 @@ class TestData static void assertExpectedState(Function function) { - assertRankingEqualsRankingFromLastMessage(PETER.getUser(), function.apply(PETER.getUser())); - assertRankingEqualsRankingFromLastMessage(KLAUS.getUser(), function.apply(KLAUS.getUser())); + assertRankingEqualsRankingFromLastMessage(PETER.getChannel(), function.apply(PETER.getChannel())); + assertRankingEqualsRankingFromLastMessage(KLAUS.getChannel(), function.apply(KLAUS.getChannel())); } private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson) @@ -42,7 +43,7 @@ class TestData private static UserRanking getLastMessageFor(String user) { return getTop10Messages() - .filter(kv -> kv.key.getUser().equals(user)) + .filter(kv -> kv.key.getChannel().equals(user)) .map(kv -> kv.value) .map(testRanking -> userRankingFor(user, testRanking)) .reduce(null, (left, right) -> right); @@ -69,7 +70,7 @@ class TestData private static Entry entryOf(TestEntry testEntry) { Entry entry = new Entry(); - entry.setWord(testEntry.getWord()); + entry.setKey(testEntry.getKey()); entry.setCounter(testEntry.getCounter()); return entry; } @@ -136,10 +137,10 @@ class TestData private static KeyValue[] USERS_MESSAGES = new KeyValue[] { KeyValue.pair( - PETER.getUser(), - TestUserData.of(PETER.getUser(), "Peter", "Pan", TestUserData.Sex.MALE)), + PETER.getChannel(), + TestUserData.of(PETER.getChannel(), "Peter", "Pan", TestUserData.Sex.MALE)), KeyValue.pair( - KLAUS.getUser(), - TestUserData.of(KLAUS.getUser(), "Klaus", "Klüse", TestUserData.Sex.OTHER)), + KLAUS.getChannel(), + TestUserData.of(KLAUS.getChannel(), "Klaus", "Klüse", TestUserData.Sex.OTHER)), }; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java index ceafa82..15a8aa4 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java @@ -10,6 +10,6 @@ import lombok.NoArgsConstructor; @Data public class TestEntry { - String word; + String key; long counter; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java index cc63c34..e58786a 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java @@ -10,5 +10,6 @@ import lombok.NoArgsConstructor; @Data public class TestUser { - String user; + String type; + String channel; } -- 2.20.1 From b223786b36be5a1b1cb02235f90d1ae0e5ffc56c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 23 Jun 2024 10:41:24 +0200 Subject: [PATCH 14/16] query: 2.1.1 - Upgraded JDK-version, Spring Boot & Docker --- Dockerfile | 2 +- pom.xml | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2fa241b..f416a44 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM eclipse-temurin:17-jre +FROM eclipse-temurin:21-jre-alpine COPY target/*.jar /opt/app.jar EXPOSE 8085 ENTRYPOINT ["java", "-jar", "/opt/app.jar"] diff --git a/pom.xml b/pom.xml index ff775f7..6a51d72 100644 --- a/pom.xml +++ b/pom.xml @@ -5,16 +5,17 @@ org.springframework.boot spring-boot-starter-parent - 3.2.5 + 3.2.7 de.juplo.kafka.wordcount query - 2.1.0 + 2.1.1 Wordcount-Query Query stream-processor of the multi-user wordcount-example - 0.33.0 + 21 + 0.44.0 -- 2.20.1 From bcb04c8b890c6b4233835151960cd69b77297af7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 23 Jun 2024 11:48:14 +0200 Subject: [PATCH 15/16] query: 2.1.2 - RocksDB does nor work in Alpine-Linux --- Dockerfile | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index f416a44..640032a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM eclipse-temurin:21-jre-alpine +FROM eclipse-temurin:21-jre COPY target/*.jar /opt/app.jar EXPOSE 8085 ENTRYPOINT ["java", "-jar", "/opt/app.jar"] diff --git a/pom.xml b/pom.xml index 6a51d72..035e095 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount query - 2.1.1 + 2.1.2 Wordcount-Query Query stream-processor of the multi-user wordcount-example -- 2.20.1 From 9c1b63783704ee2adf031441ad13bfaba65f5969 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 25 Jun 2024 07:08:32 +0200 Subject: [PATCH 16/16] query: 2.1.2 - Refined tests (added messages, that are filtered) --- .../java/de/juplo/kafka/wordcount/query/TestData.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java index 44162a0..7c71312 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -18,6 +18,7 @@ class TestData { static final TestUser PETER = TestUser.of(STATS_TYPE, "peter"); static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus"); + static final TestUser OTHER_CHANNEL = TestUser.of("POPULAR", "klaus"); static final Stream> getTop10Messages() { @@ -84,6 +85,10 @@ class TestData KLAUS, TestRanking.of( TestEntry.of("Müsch", 1l))), + KeyValue.pair( // 1 + OTHER_CHANNEL, + TestRanking.of( + TestEntry.of("Müsch", 1l))), KeyValue.pair( // 2 PETER, TestRanking.of( @@ -121,6 +126,11 @@ class TestData TestRanking.of( TestEntry.of("Müsch", 2l), TestEntry.of("s", 2l))), + KeyValue.pair( // 8 + OTHER_CHANNEL, + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 2l))), KeyValue.pair( // 9 PETER, TestRanking.of( -- 2.20.1