-FROM openjdk:11-jre-slim
+FROM eclipse-temurin:17-jre
COPY target/*.jar /opt/app.jar
EXPOSE 8083
ENTRYPOINT ["java", "-jar", "/opt/app.jar"]
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>3.0.2</version>
+ <version>3.2.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
- <version>1.2.3</version>
+ <version>1.2.14</version>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
package de.juplo.kafka.wordcount.counter;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerde;
-import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
public class CounterApplicationConfiguriation
{
@Bean
- public Properties propertyMap(CounterApplicationProperties properties)
+ public Properties streamProcessorProperties(CounterApplicationProperties counterProperties)
{
Properties propertyMap = new Properties();
- propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
- propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId());
+ propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer());
propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, Word.class.getPackageName());
+ propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName());
propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
propertyMap.put(
JsonDeserializer.TYPE_MAPPINGS,
- "W:" + Word.class.getName() + "," +
- "C:" + WordCount.class.getName());
+ "word:" + Word.class.getName() + "," +
+ "counter:" + WordCounter.class.getName());
+ propertyMap.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE);
propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
- if (properties.getCommitInterval() != null)
- propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
- if (properties.getCacheMaxBytes() != null)
- propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes());
+ if (counterProperties.getCommitInterval() != null)
+ propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
+ if (counterProperties.getCacheMaxBytes() != null)
+ propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, counterProperties.getCacheMaxBytes());
propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propertyMap;
@Bean(initMethod = "start", destroyMethod = "stop")
public CounterStreamProcessor streamProcessor(
- CounterApplicationProperties properties,
- Properties propertyMap,
+ CounterApplicationProperties applicationProperties,
+ Properties streamProcessorProperties,
KeyValueBytesStoreSupplier storeSupplier,
- ObjectMapper objectMapper,
ConfigurableApplicationContext context)
{
CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
- properties.getInputTopic(),
- properties.getOutputTopic(),
- propertyMap,
- storeSupplier,
- objectMapper);
+ applicationProperties.getInputTopic(),
+ applicationProperties.getOutputTopic(),
+ streamProcessorProperties,
+ storeSupplier);
streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
{
package de.juplo.kafka.wordcount.counter;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
String inputTopic,
String outputTopic,
Properties properties,
- KeyValueBytesStoreSupplier storeSupplier,
- ObjectMapper mapper)
+ KeyValueBytesStoreSupplier storeSupplier)
{
Topology topology = CounterStreamProcessor.buildTopology(
inputTopic,
outputTopic,
- storeSupplier,
- mapper);
+ storeSupplier);
streams = new KafkaStreams(topology, properties);
}
static Topology buildTopology(
String inputTopic,
String outputTopic,
- KeyValueBytesStoreSupplier storeSupplier,
- ObjectMapper mapper)
+ KeyValueBytesStoreSupplier storeSupplier)
{
StreamsBuilder builder = new StreamsBuilder();
.groupByKey()
.count(Materialized.as(storeSupplier))
.toStream()
- .map((word, count) -> new KeyValue<>(word, WordCount.of(word, count)))
+ .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
.to(outputTopic);
Topology topology = builder.build();
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.Value;
-
-
-@Value(staticConstructor = "of")
-public class WordCount
-{
- String user;
- String word;
- long count;
-
- public static WordCount of(Word word, long count)
- {
- return new WordCount(word.getUser(), word.getWord(), count);
- }
-}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class WordCounter
+{
+ String user;
+ String word;
+ long counter;
+
+ public static WordCounter of(Word word, long counter)
+ {
+ return new WordCounter(word.getUser(), word.getWord(), counter);
+ }
+}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.*;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.*;
-import static org.awaitility.Awaitility.*;
+import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
+import static org.awaitility.Awaitility.await;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.*;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
@SpringBootTest(
static final int PARTITIONS = 2;
@Autowired
- KafkaTemplate<String, String> kafkaTemplate;
+ KafkaTemplate<String, Word> kafkaTemplate;
@Autowired
Consumer consumer;
@RequiredArgsConstructor
static class Consumer
{
- private final List<Message> received = new LinkedList<>();
+ private final List<KeyValue<Word, WordCounter>> received = new LinkedList<>();
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
- public synchronized void receive(ConsumerRecord<String, String> record)
+ public synchronized void receive(ConsumerRecord<Word, WordCounter> record)
{
- log.debug("Received message: {}", record);
- received.add(Message.of(record.key(),record.value()));
+ log.debug(
+ "Received message: {} -> {}, key: {}, value: {}",
+ record.key(),
+ record.value(),
+ parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
+ parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
+ received.add(KeyValue.pair(record.key(),record.value()));
}
- synchronized List<Message> getReceivedMessages()
+ synchronized List<KeyValue<Word, WordCounter>> getReceivedMessages()
{
return received;
}
@TestConfiguration
static class Configuration
{
+ @Bean
+ ProducerFactory<?, ?> producerFactory(Properties streamProcessorProperties)
+ {
+ Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+
+ propertyMap.put(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ JsonSerializer.class.getName());
+ propertyMap.put(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ JsonSerializer.class.getName());
+
+ return new DefaultKafkaProducerFactory<>(propertyMap);
+ }
+
+ @Bean
+ ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
+ Properties streamProcessorProperties)
+ {
+ Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+
+ propertyMap.put(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ JsonDeserializer.class.getName());
+ propertyMap.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ JsonDeserializer.class.getName());
+
+ ConsumerFactory<? super Object, ? super Object> consumerFactory =
+ new DefaultKafkaConsumerFactory<>(propertyMap);
+
+ ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+
+ kafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
+
+ return kafkaListenerContainerFactory;
+ }
+
@Bean
Consumer consumer()
{
package de.juplo.kafka.wordcount.counter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.serialization.*;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
+
+@Slf4j
public class CounterStreamProcessorTopologyTest
{
public final static String IN = "TEST-IN";
Topology topology = CounterStreamProcessor.buildTopology(
IN,
OUT,
- Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"),
- new ObjectMapper());
+ Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
- CounterApplicationConfiguriation config =
+ CounterApplicationConfiguriation applicationConfiguriation =
new CounterApplicationConfiguriation();
- Properties properties =
- config.propertyMap(new CounterApplicationProperties());
+ Properties streamProcessorProperties =
+ applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
+ Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+
+ JsonSerde<?> keySerde = new JsonSerde<>();
+ keySerde.configure(propertyMap, true);
+ JsonSerde<?> valueSerde = new JsonSerde<>();
+ valueSerde.configure(propertyMap, false);
- TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
+ TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
- TestInputTopic<String, String> in = testDriver.createInputTopic(
+ TestInputTopic<String, Word> in = testDriver.createInputTopic(
IN,
- new StringSerializer(),
- new StringSerializer());
+ (JsonSerializer<String>)keySerde.serializer(),
+ (JsonSerializer<Word>)valueSerde.serializer());
- TestOutputTopic<String, String> out = testDriver.createOutputTopic(
+ TestOutputTopic<Word, WordCounter> out = testDriver.createOutputTopic(
OUT,
- new StringDeserializer(),
- new StringDeserializer());
+ (JsonDeserializer<Word>)keySerde.deserializer(),
+ (JsonDeserializer<WordCounter>)valueSerde.deserializer());
TestData.writeInputData((key, value) -> in.pipeInput(key, value));
- List<Message> receivedMessages = out
+ List<KeyValue<Word, WordCounter>> receivedMessages = out
.readRecordsToList()
.stream()
- .map(record -> Message.of(record.key(), record.value()))
+ .map(record ->
+ {
+ log.debug(
+ "OUT: {} -> {}, {}, {}",
+ record.key(),
+ record.value(),
+ parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
+ parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
+ return KeyValue.pair(record.key(), record.value());
+ })
.toList();
TestData.assertExpectedResult(receivedMessages);
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.Value;
-
-
-@Value(staticConstructor = "of")
-public class Message
-{
- String key;
- String value;
-}
package de.juplo.kafka.wordcount.counter;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.KeyValue;
+
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
class TestData
{
- static void writeInputData(BiConsumer<String, String> consumer)
+ static void writeInputData(BiConsumer<String, Word> consumer)
{
consumer.accept(
"peter",
- "{\"user\":\"peter\",\"word\":\"Hallo\"}");
+ Word.of("peter","Hallo"));
consumer.accept(
"klaus",
- "{\"user\":\"klaus\",\"word\":\"Müsch\"}");
+ Word.of("klaus","Müsch"));
consumer.accept(
"peter",
- "{\"user\":\"peter\",\"word\":\"Welt\"}");
+ Word.of("peter","Welt"));
consumer.accept(
"klaus",
- "{\"user\":\"klaus\",\"word\":\"Müsch\"}");
+ Word.of("klaus","Müsch"));
consumer.accept(
"klaus",
- "{\"user\":\"klaus\",\"word\":\"s\"}");
+ Word.of("klaus","s"));
consumer.accept(
"peter",
- "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+ Word.of("peter","Boäh"));
consumer.accept(
"peter",
- "{\"user\":\"peter\",\"word\":\"Welt\"}");
+ Word.of("peter","Welt"));
consumer.accept(
"peter",
- "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+ Word.of("peter","Boäh"));
consumer.accept(
"klaus",
- "{\"user\":\"klaus\",\"word\":\"s\"}");
+ Word.of("klaus","s"));
consumer.accept(
"peter",
- "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+ Word.of("peter","Boäh"));
consumer.accept(
"klaus",
- "{\"user\":\"klaus\",\"word\":\"s\"}");
+ Word.of("klaus","s"));
}
- static void assertExpectedResult(List<Message> receivedMessages)
+ static void assertExpectedResult(List<KeyValue<Word, WordCounter>> receivedMessages)
{
assertThat(receivedMessages).hasSize(11);
assertThat(receivedMessages).containsSubsequence(
expectedMessages[9]); // Boäh
}
- static Message[] expectedMessages =
+ static KeyValue<Word, WordCounter>[] expectedMessages = new KeyValue[]
{
- Message.of(
- "{\"user\":\"peter\",\"word\":\"Hallo\"}",
- "{\"user\":\"peter\",\"word\":\"Hallo\",\"count\":1}"),
- Message.of(
- "{\"user\":\"klaus\",\"word\":\"Müsch\"}",
- "{\"user\":\"klaus\",\"word\":\"Müsch\",\"count\":1}"),
- Message.of(
- "{\"user\":\"peter\",\"word\":\"Welt\"}",
- "{\"user\":\"peter\",\"word\":\"Welt\",\"count\":1}"),
- Message.of(
- "{\"user\":\"klaus\",\"word\":\"Müsch\"}",
- "{\"user\":\"klaus\",\"word\":\"Müsch\",\"count\":2}"),
- Message.of(
- "{\"user\":\"klaus\",\"word\":\"s\"}",
- "{\"user\":\"klaus\",\"word\":\"s\",\"count\":1}"),
- Message.of(
- "{\"user\":\"peter\",\"word\":\"Boäh\"}",
- "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":1}"),
- Message.of(
- "{\"user\":\"peter\",\"word\":\"Welt\"}",
- "{\"user\":\"peter\",\"word\":\"Welt\",\"count\":2}"),
- Message.of(
- "{\"user\":\"peter\",\"word\":\"Boäh\"}",
- "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":2}"),
- Message.of(
- "{\"user\":\"klaus\",\"word\":\"s\"}",
- "{\"user\":\"klaus\",\"word\":\"s\",\"count\":2}"),
- Message.of(
- "{\"user\":\"peter\",\"word\":\"Boäh\"}",
- "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":3}"),
- Message.of(
- "{\"user\":\"klaus\",\"word\":\"s\"}",
- "{\"user\":\"klaus\",\"word\":\"s\",\"count\":3}"),
+ KeyValue.pair(
+ Word.of("peter","Hallo"),
+ WordCounter.of("peter","Hallo",1)),
+ KeyValue.pair(
+ Word.of("klaus","Müsch"),
+ WordCounter.of("klaus","Müsch",1)),
+ KeyValue.pair(
+ Word.of("peter","Welt"),
+ WordCounter.of("peter","Welt",1)),
+ KeyValue.pair(
+ Word.of("klaus","Müsch"),
+ WordCounter.of("klaus","Müsch",2)),
+ KeyValue.pair(
+ Word.of("klaus","s"),
+ WordCounter.of("klaus","s",1)),
+ KeyValue.pair(
+ Word.of("peter","Boäh"),
+ WordCounter.of("peter","Boäh",1)),
+ KeyValue.pair(
+ Word.of("peter","Welt"),
+ WordCounter.of("peter","Welt",2)),
+ KeyValue.pair(
+ Word.of("peter","Boäh"),
+ WordCounter.of("peter","Boäh",2)),
+ KeyValue.pair(
+ Word.of("klaus","s"),
+ WordCounter.of("klaus","s",2)),
+ KeyValue.pair(
+ Word.of("peter","Boäh"),
+ WordCounter.of("peter","Boäh",3)),
+ KeyValue.pair(
+ Word.of("klaus","s"),
+ WordCounter.of("klaus","s",3)),
};
+
+ static Map<String, Object> convertToMap(Properties properties)
+ {
+ return properties
+ .entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ entry -> (String)entry.getKey(),
+ entry -> entry.getValue()
+ ));
+ }
+
+ static String parseHeader(Headers headers, String key)
+ {
+ Header header = headers.lastHeader(key);
+ if (header == null)
+ {
+ return key + "=null";
+ }
+ else
+ {
+ return key + "=" + new String(header.value());
+ }
+ }
}