counter: 1.1.11 - Added a test, that is based on `TopologyTestDriver`
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterApplicationIT.java
index 75f0988..b412fe4 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka.wordcount.counter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -44,8 +43,6 @@ public class CounterApplicationIT
        @Autowired
        KafkaTemplate<String, String> kafkaTemplate;
        @Autowired
-       ObjectMapper mapper;
-       @Autowired
        Consumer consumer;
 
 
@@ -63,7 +60,7 @@ public class CounterApplicationIT
 
                await("Expexted converted data")
                                .atMost(Duration.ofSeconds(10))
-                               .untilAsserted(() -> TestData.assertExpectedResult(consumer.received, mapper));
+                               .untilAsserted(() -> TestData.assertExpectedResult(consumer.getReceivedMessages()));
        }
 
 
@@ -73,11 +70,16 @@ public class CounterApplicationIT
                private final List<Message> received = new LinkedList<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
-               public void receive(ConsumerRecord<String, String> record)
+               public synchronized void receive(ConsumerRecord<String, String> record)
                {
                        log.debug("Received message: {}", record);
                        received.add(Message.of(record.key(),record.value()));
                }
+
+               synchronized List<Message> getReceivedMessages()
+               {
+                       return received;
+               }
        }
 
        @TestConfiguration