{
Mono<ChatRoom> getChatRoom(UUID id);
- Flux<ChatRoom> getChatRooms();
+ Flux<ChatRoomInfo> getChatRooms();
}
package de.juplo.kafka.chat.backend.persistence;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import reactor.core.publisher.Flux;
public interface StorageStrategy
{
- void write(Flux<ChatRoom> chatroomFlux);
+ void write(Flux<ChatRoomInfo> chatroomFlux);
Flux<ChatRoom> read();
}
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
private final Consumer<String, AbstractTo> consumer;
private final ZoneId zoneId;
private final int numShards;
+ private final int bufferSize;
+ private final Clock clock;
private final boolean[] isShardOwned;
private final long[] currentOffset;
private final long[] nextOffset;
private final Map<UUID, ChatRoom>[] chatrooms;
- private final KafkaLikeShardingStrategy shardingStrategy;
private boolean running;
@Getter
Producer<String, AbstractTo> producer,
Consumer<String, AbstractTo> consumer,
ZoneId zoneId,
- int numShards)
+ int numShards,
+ int bufferSize,
+ Clock clock)
{
log.debug(
"Creating ChatMessageChannel for topic {} with {} partitions",
this.producer = producer;
this.zoneId = zoneId;
this.numShards = numShards;
+ this.bufferSize = bufferSize;
+ this.clock = clock;
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
IntStream
.range(0, numShards)
.forEach(shard -> this.chatrooms[shard] = new HashMap<>());
- this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
}
LocalDateTime timestamp,
String text)
{
- int shard = this.shardingStrategy.selectShard(chatRoomId);
- TopicPartition tp = new TopicPartition(topic, shard);
ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
return Mono.create(sink ->
{
- ProducerRecord<String, ChatMessageTo> record =
+ ProducerRecord<String, AbstractTo> record =
new ProducerRecord<>(
- tp.topic(),
- tp.partition(),
+ topic,
+ null,
zdt.toEpochSecond(),
chatRoomId.toString(),
ChatMessageTo.of(key.getUsername(), key.getMessageId(), text));
(ChatMessageTo) record.value(),
record.partition());
break;
+
+ default:
+ log.debug(
+ "Ignoring message for chat-room {} with offset {}: {}",
+ chatRoomId,
+ record.offset(),
+ record.value());
}
nextOffset[record.partition()] = record.offset() + 1;
{
return Mono.justOrEmpty(chatrooms[shard].get(id));
}
-
- Flux<ChatRoom> getChatRooms()
- {
- return Flux.fromStream(IntStream
- .range(0, numShards)
- .filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> Integer.valueOf(shard))
- .flatMap(shard -> chatrooms[shard].values().stream()));
- }
}
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.*;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.stream.IntStream;
@RequiredArgsConstructor
public class ChatRoomChannel implements Runnable
{
private final String topic;
- private final Producer<Integer, CreateChatRoomRequestTo> producer;
- private final Consumer<Integer, CreateChatRoomRequestTo> consumer;
- private final ShardingStrategy shardingStrategy;
- private final ChatMessageChannel chatMessageChannel;
- private final Clock clock;
- private final int bufferSize;
+ private final Consumer<String, AbstractTo> consumer;
+ private final Map<UUID, ChatRoomInfo> chatrooms = new HashMap<>();
private boolean running;
{
try
{
- ConsumerRecords<Integer, CreateChatRoomRequestTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<String, AbstractTo> records = consumer.poll(Duration.ofMinutes(5));
log.info("Fetched {} messages", records.count());
- for (ConsumerRecord<Integer, CreateChatRoomRequestTo> record : records)
+ for (ConsumerRecord<String, AbstractTo> record : records)
{
- createChatRoom(record.value().toChatRoomInfo());
+ switch (record.value().getType())
+ {
+ case CHATROOM_INFO:
+ createChatRoom((ChatRoomInfoTo) record.value());
+ break;
+
+ default:
+ log.debug(
+ "Ignoring message for key {} with offset {}: {}",
+ record.key(),
+ record.offset(),
+ record.value());
+ }
}
}
catch (WakeupException e)
}
- void createChatRoom(ChatRoomInfo chatRoomInfo)
+ void createChatRoom(ChatRoomInfoTo chatRoomInfoTo)
+ {
+ ChatRoomInfo chatRoomInfo = chatRoomInfoTo.toChatRoomInfo();
+ chatrooms.put(chatRoomInfo.getId(), chatRoomInfo);
+ }
+
+ Flux<ChatRoomInfo> getChatRooms()
{
- UUID id = chatRoomInfo.getId();
- String name = chatRoomInfo.getName();
- int shard = chatRoomInfo.getShard();
- log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
- KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
- ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
- chatMessageChannel.putChatRoom(chatRoom);
+ return Flux.fromIterable(chatrooms.values());
}
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.Data;
+
+import java.util.UUID;
+
+
+@Data
+public class ChatRoomInfoTo extends AbstractTo
+{
+ private String id;
+ private String name;
+ private int shard;
+
+
+ public ChatRoomInfoTo()
+ {
+ super(ToType.CHATROOM_INFO);
+ }
+
+
+ public ChatRoomInfo toChatRoomInfo()
+ {
+ return new ChatRoomInfo(UUID.fromString(id), name, shard);
+ }
+
+ public static ChatRoomInfoTo from(ChatRoom chatRoom)
+ {
+ return ChatRoomInfoTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
+ }
+
+ public static ChatRoomInfoTo of(String id, String name, int shard)
+ {
+ ChatRoomInfoTo to = new ChatRoomInfoTo();
+ to.id = id;
+ to.name = name;
+ to.shard = shard;
+ return to;
+ }
+}
import de.juplo.kafka.chat.backend.domain.ChatHome;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
public class KafkaChatHome implements ChatHome
{
private final ShardingStrategy shardingStrategy;
+ private final ChatRoomChannel chatRoomChannel;
private final ChatMessageChannel chatMessageChanel;
}
@Override
- public Flux<ChatRoom> getChatRooms()
+ public Flux<ChatRoomInfo> getChatRooms()
{
- if (chatMessageChanel.isLoadInProgress())
- {
- throw new LoadInProgressException();
- }
- else
- {
- return chatMessageChanel.getChatRooms();
- }
+ return chatRoomChannel.getChatRooms();
}
}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import lombok.Data;
-
-import java.util.UUID;
-
-
-@Data
-public class ChatRoomInfoTo extends AbstractTo
-{
- private String id;
- private String name;
- private int shard;
-
-
- public ChatRoomInfoTo()
- {
- super(ToType.CHATROOM_INFO);
- }
-
-
- public ChatRoomInfo toChatRoomInfo()
- {
- return new ChatRoomInfo(UUID.fromString(id), name, shard);
- }
-
- public static ChatRoomInfoTo from(ChatRoom chatRoom)
- {
- return ChatRoomInfoTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
- }
-
- public static ChatRoomInfoTo of(String id, String name, int shard)
- {
- ChatRoomInfoTo to = new ChatRoomInfoTo();
- to.id = id;
- to.name = name;
- to.shard = shard;
- return to;
- }
-}