From: Kai Moritz Date: Sat, 22 Apr 2023 10:18:53 +0000 (+0200) Subject: NG X-Git-Tag: rebase--2023-08-18~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f5c4bff6a6103d0415c8b2ac9e4ab7517c04c215;p=demos%2Fkafka%2Fchat NG --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 6091c0c5..15d542a7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -10,5 +10,5 @@ public interface ChatHome { Mono getChatRoom(UUID id); - Flux getChatRooms(); + Flux getChatRooms(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java index bedd0aac..38076800 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java @@ -1,11 +1,12 @@ package de.juplo.kafka.chat.backend.persistence; import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import reactor.core.publisher.Flux; public interface StorageStrategy { - void write(Flux chatroomFlux); + void write(Flux chatroomFlux); Flux read(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 8a53d3c9..94f6fa6b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -3,7 +3,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -30,11 +29,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener private final Consumer consumer; private final ZoneId zoneId; private final int numShards; + private final int bufferSize; + private final Clock clock; private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; private final Map[] chatrooms; - private final KafkaLikeShardingStrategy shardingStrategy; private boolean running; @Getter @@ -46,7 +46,9 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener Producer producer, Consumer consumer, ZoneId zoneId, - int numShards) + int numShards, + int bufferSize, + Clock clock) { log.debug( "Creating ChatMessageChannel for topic {} with {} partitions", @@ -57,6 +59,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener this.producer = producer; this.zoneId = zoneId; this.numShards = numShards; + this.bufferSize = bufferSize; + this.clock = clock; this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; @@ -64,7 +68,6 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener IntStream .range(0, numShards) .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); - this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); } @@ -111,15 +114,13 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener LocalDateTime timestamp, String text) { - int shard = this.shardingStrategy.selectShard(chatRoomId); - TopicPartition tp = new TopicPartition(topic, shard); ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( - tp.topic(), - tp.partition(), + topic, + null, zdt.toEpochSecond(), chatRoomId.toString(), ChatMessageTo.of(key.getUsername(), key.getMessageId(), text)); @@ -261,6 +262,13 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener (ChatMessageTo) record.value(), record.partition()); break; + + default: + log.debug( + "Ignoring message for chat-room {} with offset {}: {}", + chatRoomId, + record.offset(), + record.value()); } nextOffset[record.partition()] = record.offset() + 1; @@ -348,13 +356,4 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { return Mono.justOrEmpty(chatrooms[shard].get(id)); } - - Flux getChatRooms() - { - return Flux.fromStream(IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> Integer.valueOf(shard)) - .flatMap(shard -> chatrooms[shard].values().stream())); - } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 8bbc82ec..1c6ae915 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -10,11 +10,15 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.stream.IntStream; @RequiredArgsConstructor @@ -22,12 +26,8 @@ import java.util.UUID; public class ChatRoomChannel implements Runnable { private final String topic; - private final Producer producer; - private final Consumer consumer; - private final ShardingStrategy shardingStrategy; - private final ChatMessageChannel chatMessageChannel; - private final Clock clock; - private final int bufferSize; + private final Consumer consumer; + private final Map chatrooms = new HashMap<>(); private boolean running; @@ -43,12 +43,24 @@ public class ChatRoomChannel implements Runnable { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); log.info("Fetched {} messages", records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { - createChatRoom(record.value().toChatRoomInfo()); + switch (record.value().getType()) + { + case CHATROOM_INFO: + createChatRoom((ChatRoomInfoTo) record.value()); + break; + + default: + log.debug( + "Ignoring message for key {} with offset {}: {}", + record.key(), + record.offset(), + record.value()); + } } } catch (WakeupException e) @@ -62,14 +74,14 @@ public class ChatRoomChannel implements Runnable } - void createChatRoom(ChatRoomInfo chatRoomInfo) + void createChatRoom(ChatRoomInfoTo chatRoomInfoTo) + { + ChatRoomInfo chatRoomInfo = chatRoomInfoTo.toChatRoomInfo(); + chatrooms.put(chatRoomInfo.getId(), chatRoomInfo); + } + + Flux getChatRooms() { - UUID id = chatRoomInfo.getId(); - String name = chatRoomInfo.getName(); - int shard = chatRoomInfo.getShard(); - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id); - ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); - chatMessageChannel.putChatRoom(chatRoom); + return Flux.fromIterable(chatrooms.values()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java new file mode 100644 index 00000000..f232c782 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java @@ -0,0 +1,42 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.Data; + +import java.util.UUID; + + +@Data +public class ChatRoomInfoTo extends AbstractTo +{ + private String id; + private String name; + private int shard; + + + public ChatRoomInfoTo() + { + super(ToType.CHATROOM_INFO); + } + + + public ChatRoomInfo toChatRoomInfo() + { + return new ChatRoomInfo(UUID.fromString(id), name, shard); + } + + public static ChatRoomInfoTo from(ChatRoom chatRoom) + { + return ChatRoomInfoTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard()); + } + + public static ChatRoomInfoTo of(String id, String name, int shard) + { + ChatRoomInfoTo to = new ChatRoomInfoTo(); + to.id = id; + to.name = name; + to.shard = shard; + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 88947a04..ac87aac8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -17,6 +16,7 @@ import java.util.*; public class KafkaChatHome implements ChatHome { private final ShardingStrategy shardingStrategy; + private final ChatRoomChannel chatRoomChannel; private final ChatMessageChannel chatMessageChanel; @@ -35,15 +35,8 @@ public class KafkaChatHome implements ChatHome } @Override - public Flux getChatRooms() + public Flux getChatRooms() { - if (chatMessageChanel.isLoadInProgress()) - { - throw new LoadInProgressException(); - } - else - { - return chatMessageChanel.getChatRooms(); - } + return chatRoomChannel.getChatRooms(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java deleted file mode 100644 index f232c782..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java +++ /dev/null @@ -1,42 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import lombok.Data; - -import java.util.UUID; - - -@Data -public class ChatRoomInfoTo extends AbstractTo -{ - private String id; - private String name; - private int shard; - - - public ChatRoomInfoTo() - { - super(ToType.CHATROOM_INFO); - } - - - public ChatRoomInfo toChatRoomInfo() - { - return new ChatRoomInfo(UUID.fromString(id), name, shard); - } - - public static ChatRoomInfoTo from(ChatRoom chatRoom) - { - return ChatRoomInfoTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard()); - } - - public static ChatRoomInfoTo of(String id, String name, int shard) - { - ChatRoomInfoTo to = new ChatRoomInfoTo(); - to.id = id; - to.name = name; - to.shard = shard; - return to; - } -}