X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=491335a7f86fa047f5bff8a01029cc87173a84b0;hb=41e5f74b40e4a434483dcc4142aaf8224ea5a478;hp=50627dabf99e5ea7477e51454cef4507ab44cc05;hpb=81c08612c6636a04864699233e7806d72e2ecf3a;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 50627da..491335a 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -28,8 +28,7 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static de.juplo.kafka.ApplicationTests.PARTITIONS; -import static de.juplo.kafka.ApplicationTests.TOPIC; +import static de.juplo.kafka.ApplicationTests.*; import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; @@ -38,15 +37,16 @@ import static org.awaitility.Awaitility.*; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestPropertySource( properties = { - "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", - "consumer.topic=" + TOPIC, - "consumer.commit-interval=1s" }) -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) + "sumup.requests.bootstrap-server=${spring.embedded.kafka.brokers}", + "sumup.requests.topic-in=" + INPUT_TOPIC, + "sumup.requests.commit-interval=1s" }) +@EmbeddedKafka(topics = { INPUT_TOPIC, OUTPUT_TOPIC }, partitions = PARTITIONS) @EnableAutoConfiguration @Slf4j class ApplicationTests { - public static final String TOPIC = "FOO"; + public static final String INPUT_TOPIC = "FOO"; + public static final String OUTPUT_TOPIC = "BAR"; public static final int PARTITIONS = 10; @@ -55,9 +55,9 @@ class ApplicationTests @Autowired Serializer valueSerializer; @Autowired - KafkaProducer kafkaProducer; + KafkaProducer testProducer; @Autowired - KafkaConsumer kafkaConsumer; + KafkaConsumer kafkaConsumer; @Autowired KafkaConsumer offsetConsumer; @Autowired @@ -67,10 +67,10 @@ class ApplicationTests @Autowired RecordHandler noopRecordHandler; - EndlessConsumer endlessConsumer; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; /** Tests methods */ @@ -80,8 +80,8 @@ class ApplicationTests { send100Messages((partition, key, counter) -> { - Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter)); - return new ProducerRecord<>(TOPIC, partition, key, value); + Bytes value = new Bytes(valueSerializer.serialize(INPUT_TOPIC, counter)); + return new ProducerRecord<>(INPUT_TOPIC, partition, key, value); }); await("100 records received") @@ -109,9 +109,9 @@ class ApplicationTests send100Messages((partition, key, counter) -> { Bytes value = counter == 77 - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(valueSerializer.serialize(TOPIC, counter)); - return new ProducerRecord<>(TOPIC, partition, key, value); + ? new Bytes(stringSerializer.serialize(INPUT_TOPIC, "BOOM!")) + : new Bytes(valueSerializer.serialize(INPUT_TOPIC, counter)); + return new ProducerRecord<>(INPUT_TOPIC, partition, key, value); }); await("Consumer failed") @@ -206,19 +206,19 @@ class ApplicationTests return IntStream .range(0, PARTITIONS) - .mapToObj(partition -> new TopicPartition(TOPIC, partition)) + .mapToObj(partition -> new TopicPartition(INPUT_TOPIC, partition)) .collect(Collectors.toList()); } public interface RecordGenerator { - public ProducerRecord generate(int partition, String key, long counter); + public ProducerRecord generate(int partition, String key, int counter); } void send100Messages(RecordGenerator recordGenerator) { - long i = 0; + int i = 0; for (int partition = 0; partition < 10; partition++) { @@ -227,7 +227,7 @@ class ApplicationTests ProducerRecord record = recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i); - kafkaProducer.send(record, (metadata, e) -> + testProducer.send(record, (metadata, e) -> { if (metadata != null) { @@ -267,10 +267,10 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(noopRecordHandler) { + TestRecordHandler captureOffsetAndExecuteTestHandler = + new TestRecordHandler(noopRecordHandler) { @Override - public void onNewRecord(ConsumerRecord record) + public void onNewRecord(ConsumerRecord record) { newOffsets.put( new TopicPartition(record.topic(), record.partition()), @@ -283,7 +283,7 @@ class ApplicationTests new EndlessConsumer<>( executor, properties.getClientId(), - properties.getTopic(), + properties.getTopicIn(), kafkaConsumer, captureOffsetAndExecuteTestHandler); @@ -309,13 +309,13 @@ class ApplicationTests public static class Configuration { @Bean - Serializer serializer() + Serializer valueSerializer() { - return new LongSerializer(); + return new IntegerSerializer(); } @Bean - KafkaProducer kafkaProducer(ApplicationProperties properties) + KafkaProducer testProducer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -338,5 +338,16 @@ class ApplicationTests return new KafkaConsumer<>(props); } + + @Bean + KafkaProducer kafkaProducer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + return new KafkaProducer<>(props); + } } }