From: Kai Moritz Date: Tue, 26 Jul 2022 10:40:45 +0000 (+0200) Subject: Erzeugung von ProducerRecord in Test refaktorisiert X-Git-Tag: endless-stream-consumer-DEPRECATED^2^2^2~1^2~3^2~3 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=221f774959bb0d1e113bdeefd8fdc52ea1291081;p=demos%2Fkafka%2Ftraining Erzeugung von ProducerRecord in Test refaktorisiert --- diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 40dc149..39ff0d7 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -77,7 +77,11 @@ class ApplicationTests @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { - send100Messages(i -> new Bytes(valueSerializer.serialize(TOPIC, i))); + send100Messages((partition, key, counter) -> + { + Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter)); + return new ProducerRecord<>(TOPIC, partition, key, value); + }); await("100 records received") .atMost(Duration.ofSeconds(30)) @@ -100,10 +104,13 @@ class ApplicationTests @Order(2) void commitsOffsetOfErrorForReprocessingOnError() { - send100Messages(counter -> - counter == 77 - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(valueSerializer.serialize(TOPIC, counter))); + send100Messages((partition, key, counter) -> + { + Bytes value = counter == 77 + ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) + : new Bytes(valueSerializer.serialize(TOPIC, counter)); + return new ProducerRecord<>(TOPIC, partition, key, value); + }); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -185,7 +192,12 @@ class ApplicationTests } - void send100Messages(Function messageGenerator) + public interface RecordGenerator + { + public ProducerRecord generate(int partition, String key, long counter); + } + + void send100Messages(RecordGenerator recordGenerator) { long i = 0; @@ -193,14 +205,8 @@ class ApplicationTests { for (int key = 0; key < 10; key++) { - Bytes value = messageGenerator.apply(++i); - ProducerRecord record = - new ProducerRecord<>( - TOPIC, - partition, - Integer.toString(key%2), - value); + recordGenerator.generate(partition, Integer.toString(key%2), ++i); kafkaProducer.send(record, (metadata, e) -> {