.isTrue();
}
-
@Test
void mixedMessages()
{
.isTrue();
}
+ @Test
+ void unknownMessagesAndPoisonPillsForAllTypes()
+ {
+ send100Messages((key, counter) ->
+ {
+ switch((int) (counter%10))
+ {
+ case 1:
+ return serializeString("BOOM!", "message");
+ case 2:
+ return serializeString("BOOM!", "greeting");
+ case 3:
+ return serializeString("BOOM!", "foo");
+ case 4:
+ return serializeString("BOOM!", "bar");
+ case 5:
+ return serializeAsFooMessage(key);
+ case 6:
+ case 7:
+ return serializeAsGreeting(key);
+ default:
+ return serializeAsClientMessage(key, counter);
+ }
+ });
+
+ await("100 records received")
+ .atMost(Duration.ofSeconds(30))
+ .until(() -> received.size() == 50);
+
+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
+ .untilAsserted(() ->
+ {
+ // UNSCHÖN:
+ // Funktioniert nur, weil nach den Nachrichten, die einen
+ // Deserialisierungs-Fehler auslösen noch valide Nachrichten
+ // gelesen werden.
+ // GRUND:
+ // Der MessageHandler sieht die Offsets der Fehlerhaften
+ // Nachrichten nicht!
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+ });
+
+ assertThat(endlessConsumer.isRunning())
+ .describedAs("Consumer should still be running")
+ .isTrue();
+ }
+
@Test
void commitsCurrentOffsetsOnDeserializationError()
{
Long expected = offsetsToCheck.get(tp) + 1;
log.debug("TEST: Comparing the expected offset of {} for {} to {}", expected, tp, offset);
assertThat(offset)
- .describedAs("Committed offset corresponds to the offset of the consumer")
+ .describedAs("Committed offset corresponds to the offset of the consumer for " + tp)
.isEqualTo(expected);
});
}
return new BytesAndType(serialize(greeting), "greeting");
}
+ BytesAndType serializeAsFooMessage(Integer key)
+ {
+ FooMessage foo = new FooMessage();
+ foo.setClient(key.toString());
+ foo.setTimestamp(System.currentTimeMillis());
+ return new BytesAndType(serialize(foo), "foo");
+ }
+
BytesAndType serializeString(String message, String messageType)
{
return new BytesAndType(new Bytes(message.getBytes()), messageType);