1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.clients.consumer.ConsumerRecord;
9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.producer.Producer;
11 import org.apache.kafka.clients.producer.ProducerRecord;
12 import org.apache.kafka.common.TopicPartition;
13 import org.apache.kafka.common.errors.RecordDeserializationException;
14 import org.apache.kafka.common.errors.WakeupException;
15 import reactor.core.publisher.Flux;
16 import reactor.core.publisher.Mono;
19 import java.util.Collection;
20 import java.util.List;
22 import java.util.UUID;
23 import java.util.stream.IntStream;
27 public class ChatRoomChannel implements Runnable
29 private final String topic;
30 private final Consumer<Integer, ChatRoomTo> consumer;
31 private final Producer<Integer, ChatRoomTo> producer;
32 private final ShardingStrategy shardingStrategy;
33 private final ChatMessageChannel chatMessageChannel;
35 private boolean running;
38 public ChatRoomChannel(
40 Consumer<Integer, ChatRoomTo> consumer,
41 Producer<Integer, ChatRoomTo> producer,
43 ChatMessageChannel chatMessageChannel)
46 "Creating ChatRoomChannel for topic {} with sharding for {} partitions",
50 this.consumer = consumer;
51 this.producer = producer;
52 this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
53 this.chatMessageChannel = chatMessageChannel;
60 consumer.assign(List.of(new TopicPartition(topic, 0)));
68 ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
69 log.info("Fetched {} messages", records.count());
72 catch (WakeupException e)
75 catch (RecordDeserializationException e)
82 Mono<ChatRoomInfo> sendCreateChatRoomRequest(
86 int shard = this.shardingStrategy.selectShard(chatRoomId);
87 ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name, shard);
88 return Mono.create(sink ->
90 ProducerRecord<Integer, ChatRoomTo> record =
96 producer.send(record, ((metadata, exception) ->
100 log.info("Successfully send chreate-request {}", chatRoomTo);
101 sink.success(chatRoomTo.toChatRoomInfo());
107 "Could not create-request for chat-room={}, key={}, timestamp={}, text={}: {}",
113 sink.error(exception);