public class CounterApplicationConfiguriation
{
@Bean
- public Properties streamProcessorProperties(CounterApplicationProperties counterProperties)
+ public Properties streamProcessorProperties(
+ CounterApplicationProperties counterProperties)
{
- Properties propertyMap = new Properties();
+ Properties propertyMap = serializationConfig();
propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId());
propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer());
+ propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
+ if (counterProperties.getCommitInterval() != null)
+ propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
+ if (counterProperties.getCacheMaxBytes() != null)
+ propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, counterProperties.getCacheMaxBytes());
+ propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ return propertyMap;
+ }
+
+ static Properties serializationConfig()
+ {
+ Properties propertyMap = new Properties();
+
propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
"word:" + Word.class.getName() + "," +
"counter:" + WordCounter.class.getName());
propertyMap.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE);
- propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
- if (counterProperties.getCommitInterval() != null)
- propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
- if (counterProperties.getCacheMaxBytes() != null)
- propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, counterProperties.getCacheMaxBytes());
- propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propertyMap;
}
package de.juplo.kafka.wordcount.counter;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.NoArgsConstructor;
-@AllArgsConstructor(staticName = "of")
-@NoArgsConstructor
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Word
package de.juplo.kafka.wordcount.counter;
+import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class WordCounter
{
String user;
package de.juplo.kafka.wordcount.counter;
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.springframework.util.MultiValueMap;
import java.time.Duration;
-import java.util.stream.Stream;
import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.properties.spring.json.use.type.headers=false",
- "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.counter.Word",
- "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.WordCounter",
+ "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.TestOutputWord",
+ "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
public static final String TOPIC_OUT = "out";
@Autowired
- KafkaTemplate<String, Word> kafkaTemplate;
+ KafkaTemplate<String, TestInputWord> kafkaTemplate;
@Autowired
Consumer consumer;
@Test
void testSendMessage()
{
- Stream
- .of(TestData.INPUT_MESSAGES)
+ TestData
+ .getInputMessages()
.forEach(word -> kafkaTemplate.send(TOPIC_IN, word.getUser(), word));
await("Expected messages")
static class Consumer
{
- private final MultiValueMap<Word, WordCounter> received = new LinkedMultiValueMap<>();
+ private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
public synchronized void receive(
- @Header(KafkaHeaders.RECEIVED_KEY) Word word,
- @Payload WordCounter counter)
+ @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
+ @Payload TestOutputWordCounter counter)
{
log.debug("Received message: {} -> {}", word, counter);
received.add(word, counter);
}
- synchronized MultiValueMap<Word, WordCounter> getReceivedMessages()
+ synchronized MultiValueMap<TestOutputWord, TestOutputWordCounter> getReceivedMessages()
{
return received;
}
package de.juplo.kafka.wordcount.counter;
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Stream;
-
-import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
TopologyTestDriver testDriver;
- TestInputTopic<String, Word> in;
- TestOutputTopic<Word, WordCounter> out;
+ TestInputTopic<String, TestInputWord> in;
+ TestOutputTopic<TestOutputWord, TestOutputWordCounter> out;
@BeforeEach
OUT,
Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
- CounterApplicationConfiguriation applicationConfiguriation =
- new CounterApplicationConfiguriation();
- Properties streamProcessorProperties =
- applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
- Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
-
- JsonSerde<?> keySerde = new JsonSerde<>();
- keySerde.configure(propertyMap, true);
- JsonSerde<?> valueSerde = new JsonSerde<>();
- valueSerde.configure(propertyMap, false);
-
- testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+ testDriver = new TopologyTestDriver(topology, serializationConfig());
in = testDriver.createInputTopic(
IN,
- (JsonSerializer<String>)keySerde.serializer(),
- (JsonSerializer<Word>)valueSerde.serializer());
+ new StringSerializer(),
+ new JsonSerializer().noTypeInfo());
out = testDriver.createOutputTopic(
OUT,
- (JsonDeserializer<Word>)keySerde.deserializer(),
- (JsonDeserializer<WordCounter>)valueSerde.deserializer());
+ new JsonDeserializer()
+ .copyWithType(TestOutputWord.class)
+ .ignoreTypeHeaders(),
+ new JsonDeserializer()
+ .copyWithType(TestOutputWordCounter.class)
+ .ignoreTypeHeaders());
}
@Test
public void test()
{
- Stream
- .of(TestData.INPUT_MESSAGES)
+ TestData
+ .getInputMessages()
.forEach(word -> in.pipeInput(word.getUser(), word));
- MultiValueMap<Word, WordCounter> receivedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
out
.readRecordsToList()
.forEach(record ->
package de.juplo.kafka.wordcount.counter;
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.KeyValue;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
class TestData
{
- static final Word[] INPUT_MESSAGES = new Word[]
+ private static final TestInputWord[] INPUT_MESSAGES = new TestInputWord[]
{
- Word.of("peter","Hallo"),
- Word.of("klaus","Müsch"),
- Word.of("peter","Welt"),
- Word.of("klaus","Müsch"),
- Word.of("klaus","s"),
- Word.of("peter","Boäh"),
- Word.of("peter","Welt"),
- Word.of("peter","Boäh"),
- Word.of("klaus","s"),
- Word.of("peter","Boäh"),
- Word.of("klaus","s"),
+ TestInputWord.of("peter","Hallo"),
+ TestInputWord.of("klaus","Müsch"),
+ TestInputWord.of("peter","Welt"),
+ TestInputWord.of("klaus","Müsch"),
+ TestInputWord.of("klaus","s"),
+ TestInputWord.of("peter","Boäh"),
+ TestInputWord.of("peter","Welt"),
+ TestInputWord.of("peter","Boäh"),
+ TestInputWord.of("klaus","s"),
+ TestInputWord.of("peter","Boäh"),
+ TestInputWord.of("klaus","s"),
};
- static void assertExpectedMessages(MultiValueMap<Word, WordCounter> receivedMessages)
+ static Stream<TestInputWord> getInputMessages()
+ {
+ return Stream.of(TestData.INPUT_MESSAGES);
+ }
+
+ static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
{
expectedMessages().forEach(
(word, counter) ->
.containsExactlyElementsOf(counter));
}
- static final KeyValue<Word, WordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
+ private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair(
- Word.of("peter","Hallo"),
- WordCounter.of("peter","Hallo",1)),
+ TestOutputWord.of("peter","Hallo"),
+ TestOutputWordCounter.of("peter","Hallo",1)),
KeyValue.pair(
- Word.of("klaus","Müsch"),
- WordCounter.of("klaus","Müsch",1)),
+ TestOutputWord.of("klaus","Müsch"),
+ TestOutputWordCounter.of("klaus","Müsch",1)),
KeyValue.pair(
- Word.of("peter","Welt"),
- WordCounter.of("peter","Welt",1)),
+ TestOutputWord.of("peter","Welt"),
+ TestOutputWordCounter.of("peter","Welt",1)),
KeyValue.pair(
- Word.of("klaus","Müsch"),
- WordCounter.of("klaus","Müsch",2)),
+ TestOutputWord.of("klaus","Müsch"),
+ TestOutputWordCounter.of("klaus","Müsch",2)),
KeyValue.pair(
- Word.of("klaus","s"),
- WordCounter.of("klaus","s",1)),
+ TestOutputWord.of("klaus","s"),
+ TestOutputWordCounter.of("klaus","s",1)),
KeyValue.pair(
- Word.of("peter","Boäh"),
- WordCounter.of("peter","Boäh",1)),
+ TestOutputWord.of("peter","Boäh"),
+ TestOutputWordCounter.of("peter","Boäh",1)),
KeyValue.pair(
- Word.of("peter","Welt"),
- WordCounter.of("peter","Welt",2)),
+ TestOutputWord.of("peter","Welt"),
+ TestOutputWordCounter.of("peter","Welt",2)),
KeyValue.pair(
- Word.of("peter","Boäh"),
- WordCounter.of("peter","Boäh",2)),
+ TestOutputWord.of("peter","Boäh"),
+ TestOutputWordCounter.of("peter","Boäh",2)),
KeyValue.pair(
- Word.of("klaus","s"),
- WordCounter.of("klaus","s",2)),
+ TestOutputWord.of("klaus","s"),
+ TestOutputWordCounter.of("klaus","s",2)),
KeyValue.pair(
- Word.of("peter","Boäh"),
- WordCounter.of("peter","Boäh",3)),
+ TestOutputWord.of("peter","Boäh"),
+ TestOutputWordCounter.of("peter","Boäh",3)),
KeyValue.pair(
- Word.of("klaus","s"),
- WordCounter.of("klaus","s",3)),
+ TestOutputWord.of("klaus","s"),
+ TestOutputWordCounter.of("klaus","s",3)),
};
- static MultiValueMap<Word, WordCounter> expectedMessages()
+ static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
{
- MultiValueMap<Word, WordCounter> expectedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
Stream
.of(EXPECTED_MESSAGES)
.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
return expectedMessages;
}
- static Map<String, Object> convertToMap(Properties properties)
- {
- return properties
- .entrySet()
- .stream()
- .collect(
- Collectors.toMap(
- entry -> (String)entry.getKey(),
- entry -> entry.getValue()
- ));
- }
-
static String parseHeader(Headers headers, String key)
{
Header header = headers.lastHeader(key);
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestInputWord
+{
+ String user;
+ String word;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestOutputWord
+{
+ String user;
+ String word;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestOutputWordCounter
+{
+ String user;
+ String word;
+ long counter;
+}