1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8 import org.apache.kafka.clients.consumer.ConsumerRecords;
9 import org.apache.kafka.clients.producer.Producer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
12 import org.apache.kafka.common.errors.RecordDeserializationException;
13 import org.apache.kafka.common.errors.WakeupException;
14 import reactor.core.publisher.Mono;
17 import java.util.List;
18 import java.util.UUID;
21 @RequiredArgsConstructor
23 public class ChatRoomChannel implements Runnable
25 private final String topic;
26 private final Consumer<String, ChatRoomTo> consumer;
27 private final Producer<String, ChatRoomTo> producer;
28 private final ChatRoomFactory chatRoomFactory;
30 private boolean running;
36 consumer.assign(List.of(new TopicPartition(topic, 0)));
44 ConsumerRecords<String, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
45 log.info("Fetched {} messages", records.count());
47 for (ConsumerRecord<String, ChatRoomTo> record : records)
49 UUID id = record.value().getId();
50 String name = record.value().getName();
51 chatRoomFactory.createChatRoom(id, name);
54 catch (WakeupException e)
57 catch (RecordDeserializationException e)
63 Mono<ChatRoomInfo> sendCreateChatRoomRequest(
67 ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name);
68 return Mono.create(sink ->
70 ProducerRecord<String, ChatRoomTo> record =
73 chatRoomId.toString(),
76 producer.send(record, ((metadata, exception) ->
80 log.info("Successfully send chreate-request for chat room: {}", chatRoomTo);
81 sink.success(chatRoomTo.toChatRoomInfo());
87 "Could not send create-request for chat room (id={}, name={}): {}",
91 sink.error(exception);