X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTest.java;h=ed93a218cd2ea8080999db231ddb54548f73885d;hb=657bf71b6c1c99065f26cccf0c3d2a1f30bc9407;hp=81165abd9222fd11957bcb1f1222e3bf49f4cad9;hpb=80f616369c011db99eddf42c6ee91e66fd1dfd07;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTest.java b/src/test/java/de/juplo/kafka/ApplicationTest.java index 81165ab..ed93a21 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTest.java +++ b/src/test/java/de/juplo/kafka/ApplicationTest.java @@ -1,14 +1,20 @@ package de.juplo.kafka; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.test.context.ContextConfiguration; import java.util.Set; import java.util.function.Consumer; +@ContextConfiguration(classes = ApplicationTest.Configuration.class) public class ApplicationTest extends GenericApplicationTest { public ApplicationTest() @@ -24,6 +30,7 @@ public class ApplicationTest extends GenericApplicationTest public void generate( int numberOfMessagesToGenerate, Set poisonPills, + Set logicErrors, Consumer> messageSender) { int i = 0; @@ -35,10 +42,15 @@ public class ApplicationTest extends GenericApplicationTest if (++i > numberOfMessagesToGenerate) return; - Bytes value = - poisonPills.contains(i) - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(longSerializer.serialize(TOPIC, (long)i)); + Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i)); + if (logicErrors.contains(i)) + { + value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE)); + } + if (poisonPills.contains(i)) + { + value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!")); + } ProducerRecord record = new ProducerRecord<>( @@ -53,4 +65,20 @@ public class ApplicationTest extends GenericApplicationTest } }); } + + + @TestConfiguration + public static class Configuration + { + @Primary + @Bean + public Consumer> consumer() + { + return (record) -> + { + if (record.value() == Long.MIN_VALUE) + throw new RuntimeException("BOOM (Logic-Error)!"); + }; + } + } }