NEU
authorKai Moritz <kai@juplo.de>
Sat, 15 Apr 2023 08:13:07 +0000 (10:13 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 15 Apr 2023 08:13:51 +0000 (10:13 +0200)
src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java

index 452f26d..3b63833 100644 (file)
@@ -1,18 +1,17 @@
 package de.juplo.kafka.chat.backend.domain;
 
 import lombok.Getter;
-import org.apache.kafka.common.TopicPartition;
 
 
 public class ShardNotOwnedException extends IllegalStateException
 {
   @Getter
-  private final TopicPartition topicPartition;
+  private final int shard;
 
 
-  public ShardNotOwnedException(TopicPartition topicPartition)
+  public ShardNotOwnedException(int shard)
   {
-    super("This instance does not own the shard for " + topicPartition);
-    this.topicPartition = topicPartition;
+    super("This instance does not own the shard " + shard);
+    this.shard = shard;
   }
 }
index a95df54..4460432 100644 (file)
@@ -2,21 +2,31 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.time.Duration;
 import java.time.ZoneId;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 
 @Slf4j
-public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
+public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener
 {
   private final ExecutorService executorService;
   private final Consumer<String, MessageTo> consumer;
@@ -24,8 +34,11 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   private final String topic;
   private final ZoneId zoneId;
   // private final long[] offsets; Erst mal immer alles neu einlesen
-  private final ChatHomeLoader[] chatHomeLoaders;
+  private final boolean[] isShardOwned;
   private final Map<UUID, ChatRoom>[] chatRoomMaps;
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  private boolean running;
 
 
   public KafkaChatHomeService(
@@ -47,7 +60,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     // {
     //   this.offsets[i] = 0l;
     // }
-    this.chatHomeLoaders = new ChatHomeLoader[numShards];
+    this.isShardOwned = new boolean[numShards];
     this.chatRoomMaps = new Map[numShards];
   }
 
@@ -55,30 +68,37 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
-    consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
+    try
     {
-      if (!topicPartition.topic().equals(topic))
-      {
-        log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
-        return;
-      }
+      lock.writeLock().lock();
 
-      int partition = topicPartition.partition();
-      long unseenOffset = 0; // offsets[partition];
-
-      log.info(
-          "Loading messages from partition {}: start-offset={} -> current-offset={}",
-          partition,
-          unseenOffset,
-          currentOffset);
-
-      // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
-      consumer.seek(topicPartition, unseenOffset);
-      chatHomeLoaders[partition] = new ChatHomeLoader(
-          producer,
-          currentOffset,
-          zoneId);
-    });
+      consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
+      {
+        if (!topicPartition.topic().equals(topic))
+        {
+          log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
+          return;
+        }
+
+        int partition = topicPartition.partition();
+        long unseenOffset = 0; // offsets[partition];
+
+        log.info(
+            "Loading messages from partition {}: start-offset={} -> current-offset={}",
+            partition,
+            unseenOffset,
+            currentOffset);
+
+        // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
+        consumer.seek(topicPartition, unseenOffset);
+      });
+
+      consumer.resume(partitions);
+    }
+    finally
+    {
+      lock.writeLock().unlock();
+    }
   }
 
   @Override
@@ -105,15 +125,65 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     onPartitionsRevoked(partitions);
   }
 
+  @Override
+  public void run()
+  {
+    consumer.subscribe(List.of(topic));
+
+    running = true;
+
+    try
+    {
+      while (running)
+      {
+        ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
+        log.info("Fetched {} messages", records.count());
+
+        for (ConsumerRecord<String, MessageTo> record : records)
+        {
+
+        }
+      }
+    }
+  }
+
   @Override
   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
   {
-    return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
+    if (lock.readLock().tryLock())
+    {
+      try
+      {
+        return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
+      }
+      finally
+      {
+        lock.readLock().unlock();
+      }
+    }
+    else
+    {
+      throw new ShardNotOwnedException(shard);
+    }
   }
 
   @Override
   public Flux<ChatRoom> getChatRooms(int shard)
   {
-    return Flux.fromStream(chatRoomMaps[shard].values().stream());
+    if (lock.readLock().tryLock())
+    {
+      try
+      {
+        return Flux.fromStream(chatRoomMaps[shard].values().stream());
+      }
+      finally
+      {
+        lock.readLock().unlock();
+      }
+    }
+    else
+    {
+      throw new ShardNotOwnedException(shard);
+    }
   }
 }
index 37c4e50..f036efe 100644 (file)
@@ -5,6 +5,7 @@ import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -21,8 +22,6 @@ public class KafkaChatRoomService implements ChatRoomService
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
-  private volatile MessageHandlingStrategy strategy;
-
 
   public KafkaChatRoomService(
       Producer<String, MessageTo> producer,
@@ -30,7 +29,6 @@ public class KafkaChatRoomService implements ChatRoomService
   {
     this.producer = producer;
     this.tp = tp;
-    this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
   }
 
 
@@ -40,7 +38,36 @@ public class KafkaChatRoomService implements ChatRoomService
     LocalDateTime timestamp,
     String text)
   {
-    return strategy.handleMessage(key, timestamp, text);
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, MessageTo> record =
+          new ProducerRecord<>(
+              tp.topic(),
+              tp.partition(),
+              timestamp.toEpochSecond(zoneOffset),
+              chatRoomId.toString(),
+              MessageTo.of(key.getUsername(), key.getMessageId(), text));
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          // On successful send
+          {
+            // Emit new message
+            Message message = new Message(key, metadata.offset(), timestamp, text);
+            kafkaChatRoomService.addMessage(message);
+          }
+
+          sink.success();
+        }
+        else
+        {
+          // On send-failure
+          sink.error(exception);
+        }
+      }));
+    });
   }
 
   /**