From: Kai Moritz Date: Fri, 18 Aug 2023 14:12:29 +0000 (+0200) Subject: NG X-Git-Tag: rebase--2023-08-18-abends~7 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=58f546c85c496a56f99478e7d230e9c92b9e312f;p=demos%2Fkafka%2Fchat NG --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java deleted file mode 100644 index 5d3f8626..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ /dev/null @@ -1,87 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.*; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.*; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.stream.IntStream; - - -@RequiredArgsConstructor -@Slf4j -public class ChatRoomChannel implements Runnable -{ - private final String topic; - private final Consumer consumer; - private final Map chatrooms = new HashMap<>(); - - private boolean running; - - - @Override - public void run() - { - consumer.assign(List.of(new TopicPartition(topic, 0))); - - running = true; - - while (running) - { - try - { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); - log.info("Fetched {} messages", records.count()); - - for (ConsumerRecord record : records) - { - switch (record.value().getType()) - { - case CHATROOM_INFO: - createChatRoom((ChatRoomInfoTo) record.value()); - break; - - default: - log.debug( - "Ignoring message for key {} with offset {}: {}", - record.key(), - record.offset(), - record.value()); - } - } - } - catch (WakeupException e) - { - log.info("Received WakeupException, exiting!"); - running = false; - } - } - - log.info("Exiting normally"); - } - - - void createChatRoom(ChatRoomInfoTo chatRoomInfoTo) - { - ChatRoomInfo chatRoomInfo = chatRoomInfoTo.toChatRoomInfo(); - chatrooms.put(chatRoomInfo.getId(), chatRoomInfo); - } - - Flux getChatRooms() - { - return Flux.fromIterable(chatrooms.values()); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java index 8a9e32ed..d0166c56 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -27,8 +27,6 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @Autowired ConfigurableApplicationContext context; - @Autowired - ChatRoomChannel chatRoomChannel; @Autowired Consumer chatRoomChannelConsumer; @Autowired @@ -43,14 +41,6 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @Override public void run(ApplicationArguments args) throws Exception { - log.info("Starting the consumer for the ChatRoomChannel"); - chatRoomChannelConsumerJob = taskExecutor - .submitCompletable(chatRoomChannel) - .exceptionally(e -> - { - log.error("The consumer for the ChatRoomChannel exited abnormally!", e); - return null; - }); log.info("Starting the consumer for the ChatMessageChannel"); chatMessageChannelConsumerJob = taskExecutor .submitCompletable(chatMessageChannel)