X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTest.java;h=ed93a218cd2ea8080999db231ddb54548f73885d;hb=da3b3c96a862aa1408933b312ca965dad1bbe482;hp=d3ff3b1d3927c5ea54719f177939d9e30d683f22;hpb=2bf77d19d90e7356e1a7c6e13202971fd1b9897b;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTest.java b/src/test/java/de/juplo/kafka/ApplicationTest.java index d3ff3b1..ed93a21 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTest.java +++ b/src/test/java/de/juplo/kafka/ApplicationTest.java @@ -1,21 +1,26 @@ package de.juplo.kafka; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.test.context.ContextConfiguration; import java.util.Set; import java.util.function.Consumer; +@ContextConfiguration(classes = ApplicationTest.Configuration.class) public class ApplicationTest extends GenericApplicationTest { public ApplicationTest() { super( - new StringSerializer(), - new RecordGenerator<> () + new RecordGenerator() { final StringSerializer stringSerializer = new StringSerializer(); final LongSerializer longSerializer = new LongSerializer(); @@ -25,7 +30,8 @@ public class ApplicationTest extends GenericApplicationTest public void generate( int numberOfMessagesToGenerate, Set poisonPills, - Consumer> messageSender) + Set logicErrors, + Consumer> messageSender) { int i = 0; @@ -36,16 +42,21 @@ public class ApplicationTest extends GenericApplicationTest if (++i > numberOfMessagesToGenerate) return; - Bytes value = - poisonPills.contains(i) - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(longSerializer.serialize(TOPIC, (long)i)); + Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i)); + if (logicErrors.contains(i)) + { + value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE)); + } + if (poisonPills.contains(i)) + { + value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!")); + } - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( TOPIC, partition, - Integer.toString(partition*10+key%2), + new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))), value); messageSender.accept(record); @@ -54,4 +65,20 @@ public class ApplicationTest extends GenericApplicationTest } }); } + + + @TestConfiguration + public static class Configuration + { + @Primary + @Bean + public Consumer> consumer() + { + return (record) -> + { + if (record.value() == Long.MIN_VALUE) + throw new RuntimeException("BOOM (Logic-Error)!"); + }; + } + } }