X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaChatHomeService.java;h=5133d1a68203713f970a5106a73b67de5d1f17e7;hb=035668bee4f02c4c70f43826026b40f81e3dd672;hp=a95df543813f3223895b631d2b40e334309c9c1c;hpb=220a778c91468046054fac0400ba89825c46b3f5;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index a95df543..5133d1a6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -2,21 +2,27 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.time.ZoneId; +import java.time.*; import java.util.*; import java.util.concurrent.ExecutorService; @Slf4j -public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener +public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener { private final ExecutorService executorService; private final Consumer consumer; @@ -24,8 +30,12 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL private final String topic; private final ZoneId zoneId; // private final long[] offsets; Erst mal immer alles neu einlesen - private final ChatHomeLoader[] chatHomeLoaders; + private final boolean[] isShardOwned; private final Map[] chatRoomMaps; + private final KafkaLikeShardingStrategy shardingStrategy; + + private boolean running; + private volatile boolean loadInProgress; public KafkaChatHomeService( @@ -47,14 +57,17 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL // { // this.offsets[i] = 0l; // } - this.chatHomeLoaders = new ChatHomeLoader[numShards]; + this.isShardOwned = new boolean[numShards]; this.chatRoomMaps = new Map[numShards]; + this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); } @Override public void onPartitionsAssigned(Collection partitions) { + loadInProgress = true; + consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> { if (!topicPartition.topic().equals(topic)) @@ -74,11 +87,9 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]! consumer.seek(topicPartition, unseenOffset); - chatHomeLoaders[partition] = new ChatHomeLoader( - producer, - currentOffset, - zoneId); }); + + consumer.resume(partitions); } @Override @@ -105,15 +116,113 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL onPartitionsRevoked(partitions); } + @Override + public void run() + { + consumer.subscribe(List.of(topic)); + + running = true; + + try + { + while (running) + { + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + log.info("Fetched {} messages", records.count()); + + if (loadInProgress) + { + for (ConsumerRecord record : records) + { + UUID chatRoomId = UUID.fromString(record.key()); + MessageTo messageTo = record.value(); + ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId); + Mono result = chatRoom.addMessage( + messageTo.getId(), + messageTo.getUser(), + messageTo.getText()); + result.block(). + } + } + else + { + if (!records.isEmpty()) + { + throw new IllegalStateException("All owned partions should be paused, when no load is in progress!"); + } + } + } + } + } + + Mono sendMessage( + UUID chatRoomId, + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + int shard = this.shardingStrategy.selectShard(chatRoomId); + TopicPartition tp = new TopicPartition(topic, shard); + ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + tp.topic(), + tp.partition(), + zdt.toEpochSecond(), + chatRoomId.toString(), + MessageTo.of(key.getUsername(), key.getMessageId(), text)); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + // On successful send + Message message = new Message(key, metadata.offset(), timestamp, text); + log.info("Successfully send message {}", message); + sink.success(message); + } + else + { + // On send-failure + log.error( + "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", + chatRoomId, + key, + timestamp, + text, + exception); + sink.error(exception); + } + })); + }); + } + + @Override public Mono getChatRoom(int shard, UUID id) { - return Mono.justOrEmpty(chatRoomMaps[shard].get(id)); + if (loadInProgress) + { + throw new ShardNotOwnedException(shard); + } + else + { + return Mono.justOrEmpty(chatRoomMaps[shard].get(id)); + } } @Override public Flux getChatRooms(int shard) { - return Flux.fromStream(chatRoomMaps[shard].values().stream()); + if (loadInProgress) + { + throw new ShardNotOwnedException(shard); + } + else + { + return Flux.fromStream(chatRoomMaps[shard].values().stream()); + } } }