</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
- <version>1.2.7</version>
+ <version>1.2.8</version>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.*;
import static org.awaitility.Awaitility.*;
static final int PARTITIONS = 2;
@Autowired
- KafkaTemplate<String, String> kafkaTemplate;
+ KafkaTemplate<String, Word> kafkaTemplate;
@Autowired
Consumer consumer;
@TestConfiguration
static class Configuration
{
+ @Bean
+ ProducerFactory<?, ?> producerFactory(Properties streamProcessorProperties)
+ {
+ Map<String, Object> propertyMap = streamProcessorProperties
+ .entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ entry -> (String)entry.getKey(),
+ entry -> entry.getValue()
+ ));
+
+ propertyMap.put(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ JsonSerializer.class.getName());
+ propertyMap.put(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ JsonSerializer.class.getName());
+
+ return new DefaultKafkaProducerFactory<>(propertyMap);
+ }
+
@Bean
Consumer consumer()
{
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.List;
import java.util.Properties;
TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
- TestInputTopic<String, String> in = testDriver.createInputTopic(
+ TestInputTopic<String, Word> in = testDriver.createInputTopic(
IN,
- new StringSerializer(),
- new StringSerializer());
+ new JsonSerializer<>(),
+ new JsonSerializer<>());
TestOutputTopic<String, String> out = testDriver.createOutputTopic(
OUT,
class TestData
{
- static void writeInputData(BiConsumer<String, String> consumer)
+ static void writeInputData(BiConsumer<String, Word> consumer)
{
consumer.accept(
"peter",
- "{\"user\":\"peter\",\"word\":\"Hallo\"}");
+ Word.of("peter","Hallo"));
consumer.accept(
"klaus",
- "{\"user\":\"klaus\",\"word\":\"Müsch\"}");
+ Word.of("klaus","Müsch"));
consumer.accept(
"peter",
- "{\"user\":\"peter\",\"word\":\"Welt\"}");
+ Word.of("peter","Welt"));
consumer.accept(
"klaus",
- "{\"user\":\"klaus\",\"word\":\"Müsch\"}");
+ Word.of("klaus","Müsch"));
consumer.accept(
"klaus",
- "{\"user\":\"klaus\",\"word\":\"s\"}");
+ Word.of("klaus","s"));
consumer.accept(
"peter",
- "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+ Word.of("peter","Boäh"));
consumer.accept(
"peter",
- "{\"user\":\"peter\",\"word\":\"Welt\"}");
+ Word.of("peter","Welt"));
consumer.accept(
"peter",
- "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+ Word.of("peter","Boäh"));
consumer.accept(
"klaus",
- "{\"user\":\"klaus\",\"word\":\"s\"}");
+ Word.of("klaus","s"));
consumer.accept(
"peter",
- "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+ Word.of("peter","Boäh"));
consumer.accept(
"klaus",
- "{\"user\":\"klaus\",\"word\":\"s\"}");
+ Word.of("klaus","s"));
}
static void assertExpectedResult(List<Message> receivedMessages)