import org.springframework.kafka.test.context.EmbeddedKafka;
import java.time.Duration;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_IN;
import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_OUT;
KafkaTemplate<String, String> kafkaTemplate;
@Autowired
Listener listener;
+ @Autowired
+ MessageSplitter messagSplitter;
@BeforeEach
public void clear()
{
- listener.keys.clear();
- listener.words.clear();
+ listener.reset();
}
@Test
void split()
{
- kafkaTemplate.send(TOPIC_IN, "beate", "Hello World!");
+ kafkaTemplate.send(TOPIC_IN, "beate", LORE_IPSUM);
+
+ String[] words = messagSplitter.split(LORE_IPSUM);
Awaitility
- .await("Receive two words")
+ .await("Receive all words")
.atMost(Duration.ofSeconds(5))
.untilAsserted(() ->
- assertThat(listener.words.size())
- .describedAs("Received two words")
- .isEqualTo(2));
-
- assertThat(listener.keys)
- .describedAs("Received unexpected keys")
- .containsExactly("beate", "beate");
- assertThat(listener.words)
+ assertThat(listener.total)
+ .describedAs("Received all words")
+ .isEqualTo(words.length));
+
+ assertThat(listener.words.keySet())
+ .describedAs("Received unexpected key(s)")
+ .containsExactlyInAnyOrder("beate");
+ assertThat(listener.words.get("beate"))
.describedAs("Received unexpected words")
- .containsExactly("Hello", "World");
+ .containsExactly(words);
+ }
+
+ @Test
+ void order()
+ {
+ kafkaTemplate.send(TOPIC_IN, "beate", LORE_IPSUM);
+ kafkaTemplate.send(TOPIC_IN, "peter", LORE_IPSUM);
+ kafkaTemplate.send(TOPIC_IN, "klaus", LORE_IPSUM);
+
+ String[] words = messagSplitter.split(LORE_IPSUM);
+
+ Awaitility
+ .await("Receive all words")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> {
+ assertThat(listener.total)
+ .describedAs("Received all words for beate")
+ .isEqualTo(words.length * 3);
+ });
+
+ assertThat(listener.words.keySet())
+ .describedAs("Received unexpected key(s)")
+ .containsExactlyInAnyOrder("beate", "peter", "klaus");
+ assertThat(listener.words.get("beate"))
+ .describedAs("Received unexpected words for beate")
+ .containsExactly(words);
+ assertThat(listener.words.get("peter"))
+ .describedAs("Received unexpected words for beate")
+ .containsExactly(words);
+ assertThat(listener.words.get("klaus"))
+ .describedAs("Received unexpected words for beate")
+ .containsExactly(words);
}
static class Listener
{
- final List<String> keys = new LinkedList<>();
- final List<String> words = new LinkedList<>();
+ final Map<String, List<String>> words = new HashMap<>();
+ int total = 0;
@KafkaListener(groupId = "peter", topics = TOPIC_OUT)
public void receive(ConsumerRecord<String, String> record)
{
- keys.add(record.key());
+ List<String> words = this.words.get(record.key());
+ if (words == null)
+ {
+ words = new LinkedList<>();
+ this.words.put(record.key(), words);
+ }
words.add(record.value());
+ total++;
+ }
+
+ void reset()
+ {
+ words.clear();
+ total = 0;
}
}
return new Listener();
}
}
+
+
+ final static String LORE_IPSUM =
+ "Lorem ipsum dolor sit amet, consectetur adipisici elit, sed eiusmod " +
+ "tempor incidunt ut labore et dolore magna aliqua. Ut enim ad minim " +
+ "veniam, quis nostrud exercitation ullamco laboris nisi ut aliquid ex " +
+ "ea commodi consequat. Quis aute iure reprehenderit in voluptate velit " +
+ "esse cillum dolore eu fugiat nulla pariatur. Excepteur sint obcaecat " +
+ "cupiditat non proident, sunt in culpa qui officia deserunt mollit anim " +
+ "id est laborum.";
}