counter: Fixed a `ConcurentModificationException`-bug
authorKai Moritz <kai@juplo.de>
Sat, 11 Feb 2023 13:27:15 +0000 (14:27 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 12 Feb 2023 13:53:50 +0000 (14:53 +0100)
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java

index b314726..a345935 100644 (file)
@@ -65,7 +65,7 @@ public class CounterApplicationIT
 
                await("Expexted converted data")
                                .atMost(Duration.ofSeconds(10))
-                               .untilAsserted(() -> TestData.assertExpectedResult(consumer.received));
+                               .untilAsserted(() -> TestData.assertExpectedResult(consumer.getReceivedMessages()));
        }
 
 
@@ -76,12 +76,17 @@ public class CounterApplicationIT
                private final ObjectMapper mapper;
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
-               public void receive(ConsumerRecord<String, String> record) throws JsonProcessingException
+               public synchronized void receive(ConsumerRecord<String, String> record) throws JsonProcessingException
                {
                        log.debug("Received message: {}", record);
                        Key key = mapper.readValue(record.key(), Key.class);
                        received.add(Message.of(key,record.value()));
                }
+
+               synchronized List<Message> getReceivedMessages()
+               {
+                       return received;
+               }
        }
 
        @TestConfiguration