NEU
authorKai Moritz <kai@juplo.de>
Sat, 15 Apr 2023 09:03:49 +0000 (11:03 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 15 Apr 2023 09:03:49 +0000 (11:03 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java

index 4460432..5133d1a 100644 (file)
@@ -2,27 +2,23 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.time.Duration;
-import java.time.ZoneId;
+import java.time.*;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 
 @Slf4j
@@ -36,9 +32,10 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
   // private final long[] offsets; Erst mal immer alles neu einlesen
   private final boolean[] isShardOwned;
   private final Map<UUID, ChatRoom>[] chatRoomMaps;
-  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private final KafkaLikeShardingStrategy shardingStrategy;
 
   private boolean running;
+  private volatile boolean loadInProgress;
 
 
   public KafkaChatHomeService(
@@ -62,43 +59,37 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
     // }
     this.isShardOwned = new boolean[numShards];
     this.chatRoomMaps = new Map[numShards];
+    this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
   }
 
 
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
-    try
-    {
-      lock.writeLock().lock();
+    loadInProgress = true;
 
-      consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
+    consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
+    {
+      if (!topicPartition.topic().equals(topic))
       {
-        if (!topicPartition.topic().equals(topic))
-        {
-          log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
-          return;
-        }
+        log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
+        return;
+      }
 
-        int partition = topicPartition.partition();
-        long unseenOffset = 0; // offsets[partition];
+      int partition = topicPartition.partition();
+      long unseenOffset = 0; // offsets[partition];
 
-        log.info(
-            "Loading messages from partition {}: start-offset={} -> current-offset={}",
-            partition,
-            unseenOffset,
-            currentOffset);
+      log.info(
+          "Loading messages from partition {}: start-offset={} -> current-offset={}",
+          partition,
+          unseenOffset,
+          currentOffset);
 
-        // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
-        consumer.seek(topicPartition, unseenOffset);
-      });
+      // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
+      consumer.seek(topicPartition, unseenOffset);
+    });
 
-      consumer.resume(partitions);
-    }
-    finally
-    {
-      lock.writeLock().unlock();
-    }
+    consumer.resume(partitions);
   }
 
   @Override
@@ -139,51 +130,99 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
         ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
         log.info("Fetched {} messages", records.count());
 
-        for (ConsumerRecord<String, MessageTo> record : records)
+        if (loadInProgress)
         {
-
+          for (ConsumerRecord<String, MessageTo> record : records)
+          {
+            UUID chatRoomId = UUID.fromString(record.key());
+            MessageTo messageTo = record.value();
+            ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
+            Mono<Message> result = chatRoom.addMessage(
+                messageTo.getId(),
+                messageTo.getUser(),
+                messageTo.getText());
+            result.block().
+          }
+        }
+        else
+        {
+          if (!records.isEmpty())
+          {
+            throw new IllegalStateException("All owned partions should be paused, when no load is in progress!");
+          }
         }
       }
     }
   }
 
+  Mono<Message> sendMessage(
+      UUID chatRoomId,
+      Message.MessageKey key,
+      LocalDateTime timestamp,
+      String text)
+  {
+    int shard = this.shardingStrategy.selectShard(chatRoomId);
+    TopicPartition tp = new TopicPartition(topic, shard);
+    ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, MessageTo> record =
+          new ProducerRecord<>(
+              tp.topic(),
+              tp.partition(),
+              zdt.toEpochSecond(),
+              chatRoomId.toString(),
+              MessageTo.of(key.getUsername(), key.getMessageId(), text));
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          // On successful send
+          Message message = new Message(key, metadata.offset(), timestamp, text);
+          log.info("Successfully send message {}", message);
+          sink.success(message);
+        }
+        else
+        {
+          // On send-failure
+          log.error(
+              "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
+              chatRoomId,
+              key,
+              timestamp,
+              text,
+              exception);
+          sink.error(exception);
+        }
+      }));
+    });
+  }
+
+
   @Override
   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
   {
-    if (lock.readLock().tryLock())
+    if (loadInProgress)
     {
-      try
-      {
-        return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
-      }
-      finally
-      {
-        lock.readLock().unlock();
-      }
+      throw new ShardNotOwnedException(shard);
     }
     else
     {
-      throw new ShardNotOwnedException(shard);
+      return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
     }
   }
 
   @Override
   public Flux<ChatRoom> getChatRooms(int shard)
   {
-    if (lock.readLock().tryLock())
+    if (loadInProgress)
     {
-      try
-      {
-        return Flux.fromStream(chatRoomMaps[shard].values().stream());
-      }
-      finally
-      {
-        lock.readLock().unlock();
-      }
+      throw new ShardNotOwnedException(shard);
     }
     else
     {
-      throw new ShardNotOwnedException(shard);
+      return Flux.fromStream(chatRoomMaps[shard].values().stream());
     }
   }
 }
index f036efe..ed155df 100644 (file)
@@ -3,71 +3,34 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
 import java.util.LinkedHashMap;
+import java.util.UUID;
 
 
+@RequiredArgsConstructor
 @Slf4j
 public class KafkaChatRoomService implements ChatRoomService
 {
-  private final Producer<String, MessageTo> producer;
-  private final TopicPartition tp;
+  private final KafkaChatHomeService kafkaChatHomeService;
+  private final UUID chatRoomId;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
 
-  public KafkaChatRoomService(
-      Producer<String, MessageTo> producer,
-      TopicPartition tp)
-  {
-    this.producer = producer;
-    this.tp = tp;
-  }
-
-
   @Override
-  synchronized public Mono<Message> persistMessage(
+  public Mono<Message> persistMessage(
     Message.MessageKey key,
     LocalDateTime timestamp,
     String text)
   {
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, MessageTo> record =
-          new ProducerRecord<>(
-              tp.topic(),
-              tp.partition(),
-              timestamp.toEpochSecond(zoneOffset),
-              chatRoomId.toString(),
-              MessageTo.of(key.getUsername(), key.getMessageId(), text));
-
-      producer.send(record, ((metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          // On successful send
-          {
-            // Emit new message
-            Message message = new Message(key, metadata.offset(), timestamp, text);
-            kafkaChatRoomService.addMessage(message);
-          }
-
-          sink.success();
-        }
-        else
-        {
-          // On send-failure
-          sink.error(exception);
-        }
-      }));
-    });
+    return kafkaChatHomeService.sendMessage(chatRoomId, key, timestamp, text);
   }
 
   /**