</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
- <version>1.2.8</version>
+ <version>1.2.9</version>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerde;
-import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer());
propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- propertyMap.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName());
propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
- propertyMap.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
if (counterProperties.getCommitInterval() != null)
propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
package de.juplo.kafka.wordcount.counter;
-import lombok.Value;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
-@Value(staticConstructor = "of")
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
public class WordCount
{
String user;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.*;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.context.EmbeddedKafka;
private final List<Message> received = new LinkedList<>();
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
- public synchronized void receive(ConsumerRecord<String, String> record)
+ public synchronized void receive(ConsumerRecord<Word, WordCount> record)
{
log.debug("Received message: {}", record);
received.add(Message.of(record.key(),record.value()));
return new DefaultKafkaProducerFactory<>(propertyMap);
}
+ @Bean
+ ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
+ Properties streamProcessorProperties)
+ {
+ Map<String, Object> propertyMap = streamProcessorProperties
+ .entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ entry -> (String)entry.getKey(),
+ entry -> entry.getValue()
+ ));
+
+ propertyMap.put(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ JsonDeserializer.class.getName());
+ propertyMap.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ JsonDeserializer.class.getName());
+
+ ConsumerFactory<? super Object, ? super Object> consumerFactory =
+ new DefaultKafkaConsumerFactory<>(propertyMap);
+
+ ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+
+ kafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
+
+ return kafkaListenerContainerFactory;
+ }
+
@Bean
Consumer consumer()
{
package de.juplo.kafka.wordcount.counter;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.stream.Collectors;
public class CounterStreamProcessorTopologyTest
Properties streamProcessorProperties =
applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
+ Map<String, ?> propertyMap = streamProcessorProperties
+ .entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ entry -> (String)entry.getKey(),
+ entry -> entry.getValue()
+ ));
+ JsonSerde<?> keySerde = new JsonSerde<>();
+ keySerde.configure(propertyMap, true);
+ JsonSerde<?> valueSerde = new JsonSerde<>();
+ valueSerde.configure(propertyMap, false);
+
TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
TestInputTopic<String, Word> in = testDriver.createInputTopic(
IN,
- new JsonSerializer<>(),
- new JsonSerializer<>());
+ (JsonSerializer<String>)keySerde.serializer(),
+ (JsonSerializer<Word>)valueSerde.serializer());
- TestOutputTopic<String, String> out = testDriver.createOutputTopic(
+ TestOutputTopic<Word, WordCount> out = testDriver.createOutputTopic(
OUT,
- new StringDeserializer(),
- new StringDeserializer());
+ (JsonDeserializer<Word>)keySerde.deserializer(),
+ (JsonDeserializer<WordCount>)valueSerde.deserializer());
TestData.writeInputData((key, value) -> in.pipeInput(key, value));
@Value(staticConstructor = "of")
public class Message
{
- String key;
- String value;
+ Word key;
+ WordCount value;
}
static Message[] expectedMessages =
{
Message.of(
- "{\"user\":\"peter\",\"word\":\"Hallo\"}",
- "{\"user\":\"peter\",\"word\":\"Hallo\",\"count\":1}"),
+ Word.of("peter","Hallo"),
+ WordCount.of("peter","Hallo",1)),
Message.of(
- "{\"user\":\"klaus\",\"word\":\"Müsch\"}",
- "{\"user\":\"klaus\",\"word\":\"Müsch\",\"count\":1}"),
+ Word.of("klaus","Müsch"),
+ WordCount.of("klaus","Müsch",1)),
Message.of(
- "{\"user\":\"peter\",\"word\":\"Welt\"}",
- "{\"user\":\"peter\",\"word\":\"Welt\",\"count\":1}"),
+ Word.of("peter","Welt"),
+ WordCount.of("peter","Welt",1)),
Message.of(
- "{\"user\":\"klaus\",\"word\":\"Müsch\"}",
- "{\"user\":\"klaus\",\"word\":\"Müsch\",\"count\":2}"),
+ Word.of("klaus","Müsch"),
+ WordCount.of("klaus","Müsch",2)),
Message.of(
- "{\"user\":\"klaus\",\"word\":\"s\"}",
- "{\"user\":\"klaus\",\"word\":\"s\",\"count\":1}"),
+ Word.of("klaus","s"),
+ WordCount.of("klaus","s",1)),
Message.of(
- "{\"user\":\"peter\",\"word\":\"Boäh\"}",
- "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":1}"),
+ Word.of("peter","Boäh"),
+ WordCount.of("peter","Boäh",1)),
Message.of(
- "{\"user\":\"peter\",\"word\":\"Welt\"}",
- "{\"user\":\"peter\",\"word\":\"Welt\",\"count\":2}"),
+ Word.of("peter","Welt"),
+ WordCount.of("peter","Welt",2)),
Message.of(
- "{\"user\":\"peter\",\"word\":\"Boäh\"}",
- "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":2}"),
+ Word.of("peter","Boäh"),
+ WordCount.of("peter","Boäh",2)),
Message.of(
- "{\"user\":\"klaus\",\"word\":\"s\"}",
- "{\"user\":\"klaus\",\"word\":\"s\",\"count\":2}"),
+ Word.of("klaus","s"),
+ WordCount.of("klaus","s",2)),
Message.of(
- "{\"user\":\"peter\",\"word\":\"Boäh\"}",
- "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":3}"),
+ Word.of("peter","Boäh"),
+ WordCount.of("peter","Boäh",3)),
Message.of(
- "{\"user\":\"klaus\",\"word\":\"s\"}",
- "{\"user\":\"klaus\",\"word\":\"s\",\"count\":3}"),
+ Word.of("klaus","s"),
+ WordCount.of("klaus","s",3)),
};
}