-package de.juplo.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
-import org.springframework.test.context.ContextConfiguration;
-
-import java.util.Set;
-import java.util.function.Consumer;
-
-
-@ContextConfiguration(classes = ApplicationTest.Configuration.class)
-public class ApplicationTest extends GenericApplicationTest<String, Long>
-{
- public ApplicationTest()
- {
- super(
- new RecordGenerator()
- {
- final StringSerializer stringSerializer = new StringSerializer();
- final LongSerializer longSerializer = new LongSerializer();
-
-
- @Override
- public void generate(
- int numberOfMessagesToGenerate,
- Set<Integer> poisonPills,
- Set<Integer> logicErrors,
- Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
- {
- int i = 0;
-
- for (int partition = 0; partition < 10; partition++)
- {
- for (int key = 0; key < 10; key++)
- {
- if (++i > numberOfMessagesToGenerate)
- return;
-
- Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
- if (logicErrors.contains(i))
- {
- value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
- }
- if (poisonPills.contains(i))
- {
- value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
- }
-
- ProducerRecord<Bytes, Bytes> record =
- new ProducerRecord<>(
- TOPIC,
- partition,
- new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
- value);
-
- messageSender.accept(record);
- }
- }
- }
- });
- }
-
-
- @TestConfiguration
- public static class Configuration
- {
- @Primary
- @Bean
- public Consumer<ConsumerRecord<String, Long>> consumer()
- {
- return (record) ->
- {
- if (record.value() == Long.MIN_VALUE)
- throw new RuntimeException("BOOM (Logic-Error)!");
- };
- }
- }
-}