Merge branch 'deserialization' into sumup-adder--ohne--stored-offsets wip-merge-deserialization--sumup-adder--ohne-stored-offsets
authorKai Moritz <kai@juplo.de>
Fri, 9 Sep 2022 11:43:58 +0000 (13:43 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 9 Sep 2022 11:43:58 +0000 (13:43 +0200)
* Conflicts:
** src/main/java/de/juplo/kafka/ApplicationConfiguration.java
** src/main/java/de/juplo/kafka/EndlessConsumer.java
** src/test/java/de/juplo/kafka/ApplicationIT.java
** src/test/java/de/juplo/kafka/ApplicationTests.java
** src/test/java/de/juplo/kafka/GenericApplicationTests.java
** src/test/java/de/juplo/kafka/TestRecordHandler.java

1  2 
pom.xml
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/test/java/de/juplo/kafka/ApplicationIT.java
src/test/java/de/juplo/kafka/ApplicationTests.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

diff --cc pom.xml
Simple merge
@@@ -2,7 -2,10 +2,9 @@@ package de.juplo.kafka
  
  import lombok.RequiredArgsConstructor;
  import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.*;
+ import org.apache.kafka.clients.consumer.Consumer;
 -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+ import org.apache.kafka.clients.consumer.ConsumerRecord;
+ import org.apache.kafka.clients.consumer.ConsumerRecords;
  import org.apache.kafka.common.TopicPartition;
  import org.apache.kafka.common.errors.RecordDeserializationException;
  import org.apache.kafka.common.errors.WakeupException;
  package de.juplo.kafka;
  
 +import lombok.extern.slf4j.Slf4j;
  import org.apache.kafka.clients.producer.ProducerRecord;
 -import org.apache.kafka.common.serialization.LongSerializer;
  import org.apache.kafka.common.serialization.StringSerializer;
  import org.apache.kafka.common.utils.Bytes;
 -import org.springframework.boot.test.context.TestConfiguration;
 -import org.springframework.context.annotation.Bean;
 -import org.springframework.test.context.ContextConfiguration;
 +import org.springframework.beans.factory.annotation.Autowired;
 +
 +import java.util.*;
  import java.util.function.Consumer;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
 +import static org.assertj.core.api.Assertions.assertThat;
  
  
 -@ContextConfiguration(classes = ApplicationTests.Configuration.class)
 -public class ApplicationTests extends GenericApplicationTests<String, Long>
 +@Slf4j
 +public class ApplicationTests extends GenericApplicationTests<String, String>
  {
 +  @Autowired
 +  StateRepository stateRepository;
 +
 +
    public ApplicationTests()
    {
 -    super(
 -        new RecordGenerator()
 +    super(new ApplicationTestRecrodGenerator());
-     ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
++    ((ApplicationTestRecrodGenerator) recordGenerator).tests = this;
 +  }
 +
 +
 +  static class ApplicationTestRecrodGenerator implements RecordGenerator
 +  {
 +    ApplicationTests tests;
 +
 +    final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
 +    final String[] dieWilden13 =
-         IntStream
-             .range(1, 14)
-             .mapToObj(i -> "seeräuber-" + i)
-             .toArray(i -> new String[i]);
++      IntStream
++        .range(1, 14)
++        .mapToObj(i -> "seeräuber-" + i)
++        .toArray(i -> new String[i]);
 +    final StringSerializer stringSerializer = new StringSerializer();
 +    final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
 +
 +    int counter = 0;
 +
 +    Map<String, List<AdderResult>> state;
 +
 +    @Override
 +    public int generate(
-         boolean poisonPills,
-         boolean logicErrors,
-         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
++      boolean poisonPills,
++      boolean logicErrors,
++      Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
 +    {
 +      counter = 0;
 +      state =
-           Arrays
-               .stream(dieWilden13)
-               .collect(Collectors.toMap(
-                   seeräuber -> seeräuber,
-                   seeräuber -> new LinkedList()));
-       int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
-       int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
++        Arrays
++          .stream(dieWilden13)
++          .collect(Collectors.toMap(
++            seeräuber -> seeräuber,
++            seeräuber -> new LinkedList()));
++
++      int number[] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
++      int message[] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
 +      int next = 0;
 +
 +      for (int pass = 0; pass < 333; pass++)
 +      {
-         for (int i = 0; i<13; i++)
++        for (int i = 0; i < 13; i++)
          {
 -          final StringSerializer stringSerializer = new StringSerializer();
 -          final LongSerializer longSerializer = new LongSerializer();
 +          String seeräuber = dieWilden13[i];
 +          Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
 +
 +          if (message[i] > number[i])
 +          {
 +            send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender);
 +            state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
 +            // Pick next number to calculate
-             number[i] = numbers[next++%numbers.length];
++            number[i] = numbers[next++ % numbers.length];
 +            message[i] = 1;
 +            log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
 +          }
 +
 +          Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++)));
 +          send(key, value, fail(logicErrors, pass, counter), messageSender);
 +        }
 +      }
 +
 +      return counter;
 +    }
  
-     boolean fail (boolean logicErrors, int pass, int counter)
++    boolean fail(boolean logicErrors, int pass, int counter)
 +    {
-       return logicErrors && pass > 300 && counter%77 == 0;
++      return logicErrors && pass > 300 && counter % 77 == 0;
 +    }
 +
 +    void send(
-         Bytes key,
-         Bytes value,
-         boolean fail,
-         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
++      Bytes key,
++      Bytes value,
++      boolean fail,
++      Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
 +    {
 +      counter++;
 +
 +      if (fail)
 +      {
 +        value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
 +      }
 +
 +      messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
 +    }
 +
 +    @Override
 +    public boolean canGeneratePoisonPill()
 +    {
 +      return false;
 +    }
  
 -          @Override
 -          public int generate(
 -              boolean poisonPills,
 -              boolean logicErrors,
 -              Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
 +    @Override
 +    public void assertBusinessLogic()
 +    {
-       for (int i=0; i<PARTITIONS; i++)
++      for (int i = 0; i < PARTITIONS; i++)
 +      {
 +        StateDocument stateDocument =
-             tests.stateRepository.findById(Integer.toString(i)).get();
++          tests.stateRepository.findById(Integer.toString(i)).get();
 +
 +        stateDocument
-             .results
-             .entrySet()
-             .stream()
-             .forEach(entry ->
-             {
-               String user = entry.getKey();
-               List<AdderResult> resultsForUser = entry.getValue();
++          .results
++          .entrySet()
++          .stream()
++          .forEach(entry ->
+           {
 -            int i = 0;
++            String user = entry.getKey();
++            List<AdderResult> resultsForUser = entry.getValue();
  
-               for (int j=0; j < resultsForUser.size(); j++)
 -            for (int partition = 0; partition < 10; partition++)
++            for (int j = 0; j < resultsForUser.size(); j++)
+             {
 -              for (int key = 0; key < 10000; key++)
++              if (!(j < state.get(user).size()))
                {
-                 if (!(j < state.get(user).size()))
 -                i++;
 -
 -                Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
 -                if (i == 99977)
--                {
-                   break;
 -                  if (logicErrors)
 -                  {
 -                    value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
 -                  }
 -                  if (poisonPills)
 -                  {
 -                    value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
 -                  }
--                }
--
-                 assertThat(resultsForUser.get(j))
-                     .as("Unexpected results calculation %d of user %s", j, user)
-                     .isEqualTo(state.get(user).get(j));
 -                ProducerRecord<Bytes, Bytes> record =
 -                    new ProducerRecord<>(
 -                        TOPIC,
 -                        partition,
 -                        new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
 -                        value);
 -
 -                messageSender.accept(record);
++                break;
                }
 -            }
 -
 -            return i;
 -          }
 -        });
 -  }
  
-               assertThat(state.get(user))
-                   .as("More results calculated for user %s as expected", user)
-                   .containsAll(resultsForUser);
-             });
++              assertThat(resultsForUser.get(j))
++                .as("Unexpected results calculation %d of user %s", j, user)
++                .isEqualTo(state.get(user).get(j));
++            }
 -  @TestConfiguration
 -  public static class Configuration
 -  {
 -    @Bean
 -    public RecordHandler<String, Long> applicationRecordHandler()
 -    {
 -      return (record) ->
 -      {
 -        if (record.value() == Long.MIN_VALUE)
 -          throw new RuntimeException("BOOM (Logic-Error)!");
 -      };
++            assertThat(state.get(user))
++              .as("More results calculated for user %s as expected", user)
++              .containsAll(resultsForUser);
++          });
 +      }
      }
    }
--}
++}
@@@ -12,11 -11,9 +12,12 @@@ import org.apache.kafka.common.serializ
  import org.apache.kafka.common.utils.Bytes;
  import org.junit.jupiter.api.*;
  import org.springframework.beans.factory.annotation.Autowired;
 +import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 +import org.springframework.boot.autoconfigure.mongo.MongoProperties;
 +import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
  import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
  import org.springframework.boot.test.context.TestConfiguration;
+ import org.springframework.context.annotation.Bean;
  import org.springframework.context.annotation.Import;
  import org.springframework.kafka.test.context.EmbeddedKafka;
  import org.springframework.test.context.TestPropertySource;
@@@ -58,24 -51,16 +58,21 @@@ abstract class GenericApplicationTests<
        @Autowired
        Consumer<ConsumerRecord<K, V>> consumer;
        @Autowired
-       ApplicationProperties properties;
-       @Autowired
-       ExecutorService executor;
+       ApplicationProperties applicationProperties;
        @Autowired
-       RecordHandler<K, V> recordHandler;
 +      MongoClient mongoClient;
 +      @Autowired
 +      MongoProperties mongoProperties;
 +      @Autowired
 +      RebalanceListener rebalanceListener;
 +      @Autowired
+       TestRecordHandler<K, V> recordHandler;
+       @Autowired
+       EndlessConsumer<K, V> endlessConsumer;
  
 -
        KafkaProducer<Bytes, Bytes> testRecordProducer;
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
-       EndlessConsumer<K, V> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
-       Map<TopicPartition, Long> seenOffsets;
-       Set<ConsumerRecord<K, V>> receivedRecords;
  
  
        final RecordGenerator recordGenerator;