@Slf4j
public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener
{
- private final Consumer<String, MessageTo> consumer;
- private final Producer<String, MessageTo> producer;
- private final String topic;
+ private final String chatRoomsTopic;
+ private final Consumer<Integer, ChatRoomTo> chatRoomsConsumer;
+ private final Producer<Integer, ChatRoomTo> chatRoomsProducer;
+ private final String chatMessagesTopic;
+ private final Consumer<String, MessageTo> chatMessagesConsumer;
+ private final Producer<String, MessageTo> chatMessagesProducer;
private final ZoneId zoneId;
private final int numShards;
private final boolean[] isShardOwned;
public KafkaChatHomeService(
- Consumer<String, MessageTo> consumer,
- Producer<String, MessageTo> producer,
- String topic,
+ String chatRoomsTopic,
+ Consumer<Integer, ChatRoomTo> chatRoomsConsumer,
+ Producer<Integer, ChatRoomTo> chatRoomsProducer,
+ String chatMessagesTopic,
+ Consumer<String, MessageTo> chatMessagesConsumer,
+ Producer<String, MessageTo> chatMessagesProducer,
ZoneId zoneId,
int numShards)
{
log.debug("Creating KafkaChatHomeService");
- this.consumer = consumer;
- this.producer = producer;
- this.topic = topic;
+ this.chatRoomsTopic = chatRoomsTopic;
+ this.chatRoomsConsumer = chatRoomsConsumer;
+ this.chatRoomsProducer = chatRoomsProducer;
+ this.chatMessagesTopic = chatMessagesTopic;
+ this.chatMessagesConsumer = chatMessagesConsumer;
+ this.chatMessagesProducer = chatMessagesProducer;
this.zoneId = zoneId;
this.numShards = numShards;
this.isShardOwned = new boolean[numShards];
log.info("Newly assigned partitions! Pausing normal operations...");
loadInProgress = true;
- consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
+ chatMessagesConsumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
{
int partition = topicPartition.partition();
isShardOwned[partition] = true;
nextOffset[partition],
currentOffset);
- consumer.seek(topicPartition, nextOffset[partition]);
+ chatMessagesConsumer.seek(topicPartition, nextOffset[partition]);
});
- consumer.resume(partitions);
+ chatMessagesConsumer.resume(partitions);
}
@Override
@Override
public void run()
{
- consumer.subscribe(List.of(topic));
+ chatMessagesConsumer.subscribe(List.of(chatMessagesTopic));
running = true;
{
try
{
- ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<String, MessageTo> records = chatMessagesConsumer.poll(Duration.ofMinutes(5));
log.info("Fetched {} messages", records.count());
if (loadInProgress)
void pauseAllOwnedPartions()
{
- consumer.pause(IntStream
+ chatMessagesConsumer.pause(IntStream
.range(0, numShards)
.filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> new TopicPartition(topic, shard))
+ .mapToObj(shard -> new TopicPartition(chatMessagesTopic, shard))
.toList());
}
String text)
{
int shard = this.shardingStrategy.selectShard(chatRoomId);
- TopicPartition tp = new TopicPartition(topic, shard);
+ TopicPartition tp = new TopicPartition(chatMessagesTopic, shard);
ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
return Mono.create(sink ->
{
chatRoomId.toString(),
MessageTo.of(key.getUsername(), key.getMessageId(), text));
- producer.send(record, ((metadata, exception) ->
+ chatMessagesProducer.send(record, ((metadata, exception) ->
{
if (metadata != null)
{
public void putChatRoom(ChatRoom chatRoom)
{
+
+ ProducerRecord<Integer, ChatRoomTo> record = new ProducerRecord<>(chatRoom.getShard(), );
// TODO: Nachricht senden!
chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
}