counter: 1.1.8 - Fixed a `ConcurentModificationException`-bug
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterApplicationIT.java
index c6eb0a6..b412fe4 100644 (file)
@@ -60,7 +60,7 @@ public class CounterApplicationIT
 
                await("Expexted converted data")
                                .atMost(Duration.ofSeconds(10))
-                               .untilAsserted(() -> TestData.assertExpectedResult(consumer.received));
+                               .untilAsserted(() -> TestData.assertExpectedResult(consumer.getReceivedMessages()));
        }
 
 
@@ -70,11 +70,16 @@ public class CounterApplicationIT
                private final List<Message> received = new LinkedList<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
-               public void receive(ConsumerRecord<String, String> record)
+               public synchronized void receive(ConsumerRecord<String, String> record)
                {
                        log.debug("Received message: {}", record);
                        received.add(Message.of(record.key(),record.value()));
                }
+
+               synchronized List<Message> getReceivedMessages()
+               {
+                       return received;
+               }
        }
 
        @TestConfiguration