Springify: ROT - Merge des verschärften Tests aus der Vanilla-Version
authorKai Moritz <kai@juplo.de>
Fri, 15 Apr 2022 12:01:07 +0000 (14:01 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 15 Apr 2022 12:01:07 +0000 (14:01 +0200)
* Ohne die Verschärfung des Tests war der Test grün, obwohl die Anwendung
  entgegen den Erwartungen trotz der eingetreuten Poison-Pill _alle_
  Nachrichten gelesen hat.
* Jetzt schlägt der Test wie erwartet fehl.

1  2 
src/test/java/de/juplo/kafka/ApplicationTests.java

@@@ -67,12 -63,13 +67,13 @@@ class ApplicationTest
        @Autowired
        ApplicationProperties properties;
        @Autowired
 -      ExecutorService executor;
 +      EndlessConsumer endlessConsumer;
 +      @Autowired
 +      RecordHandler recordHandler;
  
 -      Consumer<ConsumerRecord<String, Long>> testHandler;
 -      EndlessConsumer<String, Long> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
        Map<TopicPartition, Long> newOffsets;
+       Set<ConsumerRecord<String, Long>> receivedRecords;
  
  
        /** Tests methods */
  
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
 -                              .until(() -> !endlessConsumer.running());
 +                              .untilAsserted(() -> checkSeenOffsetsForProgress());
  
 -              checkSeenOffsetsForProgress();
 -              compareToCommitedOffsets(newOffsets);
 -
 -              endlessConsumer.start();
 -              await("Consumer failed")
 -                              .atMost(Duration.ofSeconds(30))
 -                              .until(() -> !endlessConsumer.running());
 -
 -              checkSeenOffsetsForProgress();
                compareToCommitedOffsets(newOffsets);
+               assertThat(receivedRecords.size())
+                               .describedAs("Received not all sent events")
+                               .isLessThan(100);
        }
  
  
                        newOffsets.put(tp, offset - 1);
                });
  
 -              Consumer<ConsumerRecord<String, Long>> captureOffsetAndExecuteTestHandler =
 +              recordHandler.captureOffsets =
                                record ->
+                               {
++                                      receivedRecords.add(record);
                                        newOffsets.put(
                                                        new TopicPartition(record.topic(), record.partition()),
                                                        record.offset());
 -                                      receivedRecords.add(record);
 -                                      testHandler.accept(record);
+                               };
  
 -              endlessConsumer =
 -                              new EndlessConsumer<>(
 -                                              executor,
 -                                              properties.getClientId(),
 -                                              properties.getTopic(),
 -                                              kafkaConsumer,
 -                                              captureOffsetAndExecuteTestHandler);
 -
                endlessConsumer.start();
        }