import reactor.core.publisher.Mono;
import java.time.*;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import java.util.stream.IntStream;
public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
{
private final String topic;
- private final Producer<String, MessageTo> producer;
- private final Consumer<String, MessageTo> consumer;
+ private final Producer<String, AbstractTo> producer;
+ private final Consumer<String, AbstractTo> consumer;
private final ZoneId zoneId;
private final int numShards;
private final boolean[] isShardOwned;
public ChatMessageChannel(
String topic,
- Producer<String, MessageTo> producer,
- Consumer<String, MessageTo> consumer,
+ Producer<String, AbstractTo> producer,
+ Consumer<String, AbstractTo> consumer,
ZoneId zoneId,
int numShards)
{
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
this.chatrooms = new Map[numShards];
+ IntStream
+ .range(0, numShards)
+ .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
}
ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
return Mono.create(sink ->
{
- ProducerRecord<String, MessageTo> record =
+ ProducerRecord<String, ChatMessageTo> record =
new ProducerRecord<>(
tp.topic(),
tp.partition(),
zdt.toEpochSecond(),
chatRoomId.toString(),
- MessageTo.of(key.getUsername(), key.getMessageId(), text));
+ ChatMessageTo.of(key.getUsername(), key.getMessageId(), text));
producer.send(record, ((metadata, exception) ->
{
@Override
public void run()
{
- consumer.subscribe(List.of(topic));
+ consumer.subscribe(List.of(topic), this);
running = true;
{
try
{
- ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<String, AbstractTo> records = consumer.poll(Duration.ofMinutes(5));
log.info("Fetched {} messages", records.count());
if (loadInProgress)
{
- loadMessages(records);
+ loadChatRoom(records);
if (isLoadingCompleted())
{
running = false;
}
}
+
+ log.info("Exiting normally");
}
- void loadMessages(ConsumerRecords<String, MessageTo> records)
+ void loadChatRoom(ConsumerRecords<String, AbstractTo> records)
{
- for (ConsumerRecord<String, MessageTo> record : records)
+ for (ConsumerRecord<String, AbstractTo> record : records)
{
- nextOffset[record.partition()] = record.offset() + 1;
- UUID chatRoomId = UUID.fromString(record.key());
- MessageTo messageTo = record.value();
+ switch (record.value().getType())
+ {
+ case CREATE_CHATROOM_REQUEST:
+ createChatRoom((CreateChatRoomRequestTo) record.value());
+ break;
+
+ case MESSAGE_SENT:
+ UUID chatRoomId = UUID.fromString(record.key());
+ Instant instant = Instant.ofEpochSecond(record.timestamp());
+ LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+ loadChatMessage(
+ chatRoomId,
+ timestamp,
+ record.offset(),
+ (ChatMessageTo) record.value(),
+ record.partition());
+ break;
+ }
- Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
+ nextOffset[record.partition()] = record.offset() + 1;
+ }
+ }
- Instant instant = Instant.ofEpochSecond(record.timestamp());
- LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+ void createChatRoom(
+ CreateChatRoomRequestTo createChatRoomRequestTo,
+ int partition)
+ {
+ chatrooms[partition].put
+ }
- Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
+ void loadChatMessage(
+ UUID chatRoomId,
+ LocalDateTime timestamp,
+ long offset,
+ ChatMessageTo chatMessageTo,
+ int partition)
+ {
+ Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
+ Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
- ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
- if (chatRoom == null)
- {
- // Alles pausieren und erst von putChatRoom wieder resumen lassen!
- }
- KafkaChatRoomService kafkaChatRoomService =
- (KafkaChatRoomService) chatRoom.getChatRoomService();
+ ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
+ KafkaChatRoomService kafkaChatRoomService =
+ (KafkaChatRoomService) chatRoom.getChatRoomService();
- kafkaChatRoomService.persistMessage(message);
- }
+ kafkaChatRoomService.persistMessage(message);
}
boolean isLoadingCompleted()
return IntStream
.range(0, numShards)
.filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard])
- .collect(
- () -> Boolean.TRUE, // TODO: Boolean is immutable
- (acc, v) -> Boolean.valueOf(acc && v), // TODO: Boolean is immutable
- (a, b) -> Boolean.valueOf(a && b)); // TODO: Boolean is immutable
+ .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
}
void pauseAllOwnedPartions()