From 1f324b5c26b6924a17e410dffd038f15c1e462b3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 15 Sep 2023 09:41:30 +0200 Subject: [PATCH] FIX --- .../kafka/ConsumerTaskExecutor.java | 2 +- .../kafka/ConsumerTaskRunner.java | 4 +- .../chat/backend/KafkaConfigurationIT.java | 70 +++++++++++-------- 3 files changed, 44 insertions(+), 32 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java index 9ebc26b2..881dd293 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java @@ -46,7 +46,7 @@ public class ConsumerTaskExecutor } - interface WorkAssignor + public interface WorkAssignor { void assignWork(Consumer consumer); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java index 0f433007..233e8f63 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java @@ -11,13 +11,13 @@ public class ConsumerTaskRunner private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor; private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor; - void run() + public void run() { infoChannelConsumerTaskExecutor.executeConsumerTask(); dataChannelConsumerTaskExecutor.executeConsumerTask(); } - void joinConsumerTasks() + public void joinConsumerTasks() { dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index 630dc630..1170ac0d 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -1,7 +1,6 @@ package de.juplo.kafka.chat.backend; -import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel; -import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesApplicationRunner; +import de.juplo.kafka.chat.backend.implementation.kafka.*; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -9,8 +8,12 @@ import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Import; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; @@ -26,6 +29,7 @@ import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC; @SpringBootTest( webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { + "spring.main.allow-bean-definition-overriding=true", "chat.backend.services=kafka", "chat.backend.kafka.client-id-PREFIX=TEST", "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", @@ -51,31 +55,25 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT @BeforeAll public static void sendAndLoadStoredData( @Autowired KafkaTemplate messageTemplate, - @Autowired Consumer chatRoomChannelConsumer, - @Autowired ThreadPoolTaskExecutor taskExecutor, - @Autowired DataChannel dataChannel) + @Autowired ConsumerTaskRunner consumerTaskRunner) { - send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"name\": \"FOO\" }", "command_create_chatroom"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); + send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - List assignedPartitions = List.of(new TopicPartition(INFO_TOPIC, 2)); - chatRoomChannelConsumer.assign(assignedPartitions); - dataChannel.onPartitionsAssigned(assignedPartitions); - CONSUMER_JOB = taskExecutor - .submitCompletable(dataChannel) - .exceptionally(e -> - { - log.error("The consumer for the ChatRoomChannel exited abnormally!", e); - return null; - }); + consumerTaskRunner.run(); } - static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) + static void send( + KafkaTemplate kafkaTemplate, + String topic, + String key, + String value, + String typeId) { - ProducerRecord record = new ProducerRecord<>(INFO_TOPIC, key, value); + ProducerRecord record = new ProducerRecord<>(topic, key, value); record.headers().add("__TypeId__", typeId.getBytes()); SendResult result = kafkaTemplate.send(record).join(); log.info( @@ -84,14 +82,28 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT value, new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); } - @AfterAll - static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer) + static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner) + { + consumerTaskRunner.joinConsumerTasks(); + } + + + @TestConfiguration + @EnableConfigurationProperties(ChatBackendProperties.class) + @Import(KafkaServicesConfiguration.class) + static class KafkaChatHomeTestConfiguration { - log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - chatRoomChannelConsumer.wakeup(); - log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - CONSUMER_JOB.join(); - log.info("Joined the consumer of the ChatRoomChannel"); + @Bean + ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor(DataChannel dataChannel) + { + return consumer -> + { + List assignedPartitions = + List.of(new TopicPartition(DATA_TOPIC, 2)); + consumer.assign(assignedPartitions); + dataChannel.onPartitionsAssigned(assignedPartitions); + }; + } } } -- 2.20.1