-FROM openjdk:11-jre-slim
+FROM eclipse-temurin:17-jre
COPY target/*.jar /opt/app.jar
EXPOSE 8086
ENTRYPOINT ["java", "-jar", "/opt/app.jar"]
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>3.0.2</version>
+ <version>3.2.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>splitter</artifactId>
- <version>1.1.1</version>
+ <version>1.2.0</version>
<name>Wordcount-Splitter</name>
<description>Stream-processor of the multi-user wordcount-example, that splits the sentences up into single words</description>
<properties>
- <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
+ <docker-maven-plugin.version>0.44.0</docker-maven-plugin.version>
</properties>
<dependencies>
<dependency>
<build>
<plugins>
+ <plugin>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
@SpringBootApplication
-@EnableConfigurationProperties(SplitterApplicationProperties.class)
public class SplitterApplication
{
public static void main(String[] args)
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Configuration
+@EnableConfigurationProperties(SplitterApplicationProperties.class)
+@Slf4j
+public class SplitterApplicationConfiguration
+{
+ @Bean(initMethod = "start", destroyMethod = "stop")
+ public SplitterStreamProcessor streamProcessor(
+ SplitterApplicationProperties properties,
+ Properties streamProcessorProperties,
+ ConfigurableApplicationContext context)
+ {
+ SplitterStreamProcessor streamProcessor = new SplitterStreamProcessor(
+ properties.getInputTopic(),
+ properties.getOutputTopic(),
+ streamProcessorProperties);
+
+ streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
+ {
+ log.error("Unexpected error!", e);
+ CompletableFuture.runAsync(() ->
+ {
+ log.info("Stopping application...");
+ SpringApplication.exit(context, () -> 1);
+ });
+ return SHUTDOWN_CLIENT;
+ });
+
+ return streamProcessor;
+ }
+
+ @Bean
+ public Properties streamProcessorProperties(
+ SplitterApplicationProperties applicationProperties)
+ {
+ Properties properties = serializationConfig();
+
+ properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationProperties.getApplicationId());
+ properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer());
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ return properties;
+ }
+
+ static Properties serializationConfig()
+ {
+ Properties properties = new Properties();
+
+ properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ properties.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
+ properties.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Recording.class.getName());
+ properties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
+
+ return properties;
+ }
+}
package de.juplo.kafka.wordcount.splitter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
-import org.springframework.boot.SpringApplication;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerde;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.stereotype.Component;
-import jakarta.annotation.PostConstruct;
-import jakarta.annotation.PreDestroy;
import java.util.Arrays;
import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
-import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
-
@Slf4j
-@Component
public class SplitterStreamProcessor
{
final static Pattern PATTERN = Pattern.compile("[^\\p{IsAlphabetic}]+");
public SplitterStreamProcessor(
- SplitterApplicationProperties properties,
- ConfigurableApplicationContext context)
+ String inputTopic,
+ String outputTopic,
+ Properties properties)
+ {
+ Topology topology = buildTopology(inputTopic, outputTopic);
+ streams = new KafkaStreams(topology, properties);
+ }
+
+ static Topology buildTopology(
+ String inputTopic,
+ String outputTopic)
{
StreamsBuilder builder = new StreamsBuilder();
- KStream<String, Recording> source = builder.stream(properties.getInputTopic());
+ KStream<User, Recording> source = builder.stream(inputTopic);
source
.flatMapValues(recording -> Arrays
.stream(PATTERN.split(recording.getSentence()))
.map(word -> Word.of(recording.getUser(), word))
.toList())
- .to(properties.getOutputTopic());
+ .to(outputTopic);
- Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- props.put(JsonDeserializer.TRUSTED_PACKAGES, Recording.class.getName() );
- props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Recording.class.getName());
- props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ Topology topology = builder.build();
+ log.info("\n\n{}", topology.describe());
- streams = new KafkaStreams(builder.build(), props);
- streams.setUncaughtExceptionHandler((Throwable e) ->
- {
- log.error("Unexpected error!", e);
- CompletableFuture.runAsync(() ->
- {
- log.info("Stopping application...");
- SpringApplication.exit(context, () -> 1);
- });
- return SHUTDOWN_CLIENT;
- });
+ return topology;
}
- @PostConstruct
public void start()
{
log.info("Starting Stream-Processor");
streams.start();
}
- @PreDestroy
public void stop()
{
log.info("Stopping Stream-Processor");
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class User
+{
+ String user;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestOutputUser
+{
+ String user;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestOutputWord
+{
+ String user;
+ String word;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.recorder;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestInputRecording
+{
+ String user;
+ String sentence;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.recorder;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestInputUser
+{
+ String user;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import de.juplo.kafka.wordcount.counter.TestOutputUser;
+import de.juplo.kafka.wordcount.counter.TestOutputWord;
+import de.juplo.kafka.wordcount.recorder.TestInputRecording;
+import de.juplo.kafka.wordcount.recorder.TestInputUser;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.time.Duration;
+
+import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.TOPIC_IN;
+import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.TOPIC_OUT;
+import static org.awaitility.Awaitility.await;
+
+
+@SpringBootTest(
+ properties = {
+ "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+ "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+ "spring.kafka.producer.properties.spring.json.add.type.headers=false",
+ "spring.kafka.consumer.auto-offset-reset=earliest",
+ "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+ "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+ "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.counter.TestOutputUser",
+ "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestOutputWord",
+ "logging.level.root=WARN",
+ "logging.level.de.juplo=DEBUG",
+ "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.wordcount.splitter.input-topic=" + TOPIC_IN,
+ "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT })
+@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
+@Slf4j
+public class SplitterApplicationIT
+{
+ public final static String TOPIC_IN = "in";
+ public final static String TOPIC_OUT = "out";
+
+ @Autowired
+ Consumer consumer;
+
+ @BeforeAll
+ public static void testSendMessage(
+ @Autowired KafkaTemplate<TestInputUser, TestInputRecording> kafkaTemplate)
+ {
+ TestData
+ .getInputMessages()
+ .forEach(kv ->
+ {
+ try
+ {
+ SendResult<TestInputUser, TestInputRecording> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ log.info(
+ "Sent: {}={}, partition={}, offset={}",
+ result.getProducerRecord().key(),
+ result.getProducerRecord().value(),
+ result.getRecordMetadata().partition(),
+ result.getRecordMetadata().offset());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+
+ @Test
+ void testSendMessage() throws Exception
+ {
+ await("Expected converted data")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() ->
+ TestData.assertExpectedMessages(consumer.getReceivedMessages()));
+ }
+
+
+ static class Consumer
+ {
+ private final MultiValueMap<TestOutputUser, TestOutputWord> received = new LinkedMultiValueMap<>();
+
+ @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
+ public synchronized void receive(
+ @Header(KafkaHeaders.RECEIVED_KEY) TestOutputUser key,
+ @Payload TestOutputWord value)
+ {
+ log.debug("Received message: {}={}", key, value);
+ received.add(key, value);
+ }
+
+ synchronized MultiValueMap<TestOutputUser, TestOutputWord> getReceivedMessages()
+ {
+ return received;
+ }
+ }
+
+
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ Consumer consumer()
+ {
+ return new Consumer();
+ }
+ }
+}
+++ /dev/null
-package de.juplo.kafka.wordcount.splitter;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.time.Duration;
-
-import static de.juplo.kafka.wordcount.splitter.SplitterApplicationTests.*;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.awaitility.Awaitility.*;
-
-
-@SpringBootTest(
- properties = {
- "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.wordcount.splitter.input-topic=" + TOPIC_IN,
- "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT })
-@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS)
-@Slf4j
-public class SplitterApplicationTests
-{
- public final static String TOPIC_IN = "in";
- public final static String TOPIC_OUT = "out";
- static final int PARTITIONS = 2;
-
- @Autowired
- KafkaTemplate<String, String> kafkaTemplate;
- @Autowired
- ObjectMapper mapper;
- @Autowired
- Consumer consumer;
-
-
- @BeforeEach
- public void clear()
- {
- consumer.received.clear();
- }
-
-
- @Test
- void testSendMessage() throws Exception
- {
- Recording recording = new Recording();
- recording.setUser("peter");
- recording.setSentence("Hallo Welt!");
- kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording));
- recording.setUser("klaus");
- recording.setSentence("Müsch gäb's auch!");
- kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording));
- recording.setUser("peter");
- recording.setSentence("Boäh, echt! ß mal nä Nümmäh!");
- kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording));
-
- String peter1 = mapper.writeValueAsString(Word.of("peter", "Hallo"));
- String peter2 = mapper.writeValueAsString(Word.of("peter", "Welt"));
- String peter3 = mapper.writeValueAsString(Word.of("peter", "Boäh"));
- String peter4 = mapper.writeValueAsString(Word.of("peter", "echt"));
- String peter5 = mapper.writeValueAsString(Word.of("peter", "ß"));
- String peter6 = mapper.writeValueAsString(Word.of("peter", "mal"));
- String peter7 = mapper.writeValueAsString(Word.of("peter", "nä"));
- String peter8 = mapper.writeValueAsString(Word.of("peter", "Nümmäh"));
-
- String klaus1 = mapper.writeValueAsString(Word.of("klaus","Müsch"));
- String klaus2 = mapper.writeValueAsString(Word.of("klaus","gäb"));
- String klaus3 = mapper.writeValueAsString(Word.of("klaus","s"));
- String klaus4 = mapper.writeValueAsString(Word.of("klaus","auch"));
-
- await("Expexted converted data")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() ->
- {
- assertThat(consumer.received).hasSize(2);
- assertThat(consumer.received.get("klaus")).containsExactly(klaus1, klaus2, klaus3, klaus4);
- assertThat(consumer.received.get("peter")).containsExactly(peter1, peter2, peter3, peter4, peter5, peter6, peter7, peter8);
- });
- }
-
-
- static class Consumer
- {
- final MultiValueMap<String, String> received = new LinkedMultiValueMap<>();
-
- @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
- public void receive(ConsumerRecord<String, String> record)
- {
- log.debug("Received message: {}", record);
- received.add(record.key(), record.value());
- }
- }
-
- @TestConfiguration
- static class Configuration
- {
- @Bean
- Consumer consumer()
- {
- return new Consumer();
- }
- }
-}
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import de.juplo.kafka.wordcount.counter.TestOutputUser;
+import de.juplo.kafka.wordcount.counter.TestOutputWord;
+import de.juplo.kafka.wordcount.recorder.TestInputRecording;
+import de.juplo.kafka.wordcount.recorder.TestInputUser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import static de.juplo.kafka.wordcount.splitter.SplitterApplicationConfiguration.serializationConfig;
+
+
+@Slf4j
+public class SplitterStreamProcessorTopologyTest
+{
+ public final static String IN = "TEST-IN";
+ public final static String OUT = "TEST-OUT";
+
+
+ TopologyTestDriver testDriver;
+ TestInputTopic<TestInputUser, TestInputRecording> in;
+ TestOutputTopic<TestOutputUser, TestOutputWord> out;
+
+
+ @BeforeEach
+ public void setUp()
+ {
+ testDriver = new TopologyTestDriver(
+ SplitterStreamProcessor.buildTopology(IN, OUT),
+ serializationConfig());
+
+ in = testDriver.createInputTopic(
+ IN,
+ new JsonSerializer().noTypeInfo(),
+ new JsonSerializer().noTypeInfo());
+
+ out = testDriver.createOutputTopic(
+ OUT,
+ new JsonDeserializer().copyWithType(TestOutputUser.class),
+ new JsonDeserializer().copyWithType(TestOutputWord.class));
+ }
+
+
+ @Test
+ public void test()
+ {
+ TestData
+ .getInputMessages()
+ .forEach(kv -> in.pipeInput(kv.key, kv.value));
+
+ MultiValueMap<TestOutputUser, TestOutputWord> receivedMessages = new LinkedMultiValueMap<>();
+ out
+ .readRecordsToList()
+ .forEach(record -> receivedMessages.add(record.key(), record.value()));
+
+ TestData.assertExpectedMessages(receivedMessages);
+ }
+
+ @AfterEach
+ public void tearDown()
+ {
+ testDriver.close();
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import de.juplo.kafka.wordcount.counter.TestOutputUser;
+import de.juplo.kafka.wordcount.counter.TestOutputWord;
+import de.juplo.kafka.wordcount.recorder.TestInputRecording;
+import de.juplo.kafka.wordcount.recorder.TestInputUser;
+import org.apache.kafka.streams.KeyValue;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.time.Duration;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+
+public class TestData
+{
+ static final TestInputUser PETER = TestInputUser.of("peter");
+ static final TestInputUser KLAUS = TestInputUser.of("klaus");
+
+
+ static final Stream<KeyValue<TestInputUser, TestInputRecording>> getInputMessages()
+ {
+ return Stream.of(INPUT_MESSAGES);
+ }
+
+ private static final KeyValue<TestInputUser, TestInputRecording>[] INPUT_MESSAGES = new KeyValue[]
+ {
+ new KeyValue<>(
+ PETER,
+ TestInputRecording.of(PETER.getUser(), "Hallo Welt!")),
+ new KeyValue<>(
+ KLAUS,
+ TestInputRecording.of(KLAUS.getUser(), "Müsch gäb's auch!")),
+ new KeyValue<>(
+ PETER,
+ TestInputRecording.of(PETER.getUser(), "Boäh, echt! ß mal nä Nümmäh!")),
+ };
+
+ static void assertExpectedMessages(MultiValueMap<TestOutputUser, TestOutputWord> receivedMessages)
+ {
+ await("Received expected messages")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> expectedMessages().forEach((user, word) ->
+ assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word)));
+ }
+
+ private static final KeyValue<TestOutputUser, TestOutputWord>[] EXPECTED_MESSAGES = new KeyValue[]
+ {
+ KeyValue.pair(
+ TestOutputUser.of(PETER.getUser()),
+ TestOutputWord.of(PETER.getUser(), "Hallo")),
+ KeyValue.pair(
+ TestOutputUser.of(PETER.getUser()),
+ TestOutputWord.of(PETER.getUser(), "Welt")),
+ KeyValue.pair(
+ TestOutputUser.of(KLAUS.getUser()),
+ TestOutputWord.of(KLAUS.getUser(), "Müsch")),
+ KeyValue.pair(
+ TestOutputUser.of(KLAUS.getUser()),
+ TestOutputWord.of(KLAUS.getUser(), "gäb")),
+ KeyValue.pair(
+ TestOutputUser.of(KLAUS.getUser()),
+ TestOutputWord.of(KLAUS.getUser(), "s")),
+ KeyValue.pair(
+ TestOutputUser.of(KLAUS.getUser()),
+ TestOutputWord.of(KLAUS.getUser(), "auch")),
+ KeyValue.pair(
+ TestOutputUser.of(PETER.getUser()),
+ TestOutputWord.of(PETER.getUser(), "Boäh")),
+ KeyValue.pair(
+ TestOutputUser.of(PETER.getUser()),
+ TestOutputWord.of(PETER.getUser(), "echt")),
+ KeyValue.pair(
+ TestOutputUser.of(PETER.getUser()),
+ TestOutputWord.of(PETER.getUser(), "ß")),
+ KeyValue.pair(
+ TestOutputUser.of(PETER.getUser()),
+ TestOutputWord.of(PETER.getUser(), "mal")),
+ KeyValue.pair(
+ TestOutputUser.of(PETER.getUser()),
+ TestOutputWord.of(PETER.getUser(), "nä")),
+ KeyValue.pair(
+ TestOutputUser.of(PETER.getUser()),
+ TestOutputWord.of(PETER.getUser(), "Nümmäh")),
+ };
+
+ static MultiValueMap<TestOutputUser, TestOutputWord> expectedMessages()
+ {
+ MultiValueMap<TestOutputUser, TestOutputWord> expectedMessages = new LinkedMultiValueMap<>();
+ Stream
+ .of(EXPECTED_MESSAGES)
+ .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
+ return expectedMessages;
+ }
+}