import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.*;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import java.util.stream.IntStream;
public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
{
private final String topic;
- private final Consumer<String, MessageTo> consumer;
private final Producer<String, MessageTo> producer;
+ private final Consumer<String, MessageTo> consumer;
private final ZoneId zoneId;
private final int numShards;
private final boolean[] isShardOwned;
public ChatMessageChannel(
String topic,
- Consumer<String, MessageTo> consumer,
Producer<String, MessageTo> producer,
+ Consumer<String, MessageTo> consumer,
ZoneId zoneId,
int numShards)
{
}
+ Mono<Message> sendMessage(
+ UUID chatRoomId,
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text)
+ {
+ int shard = this.shardingStrategy.selectShard(chatRoomId);
+ TopicPartition tp = new TopicPartition(topic, shard);
+ ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
+ return Mono.create(sink ->
+ {
+ ProducerRecord<String, MessageTo> record =
+ new ProducerRecord<>(
+ tp.topic(),
+ tp.partition(),
+ zdt.toEpochSecond(),
+ chatRoomId.toString(),
+ MessageTo.of(key.getUsername(), key.getMessageId(), text));
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ // On successful send
+ Message message = new Message(key, metadata.offset(), timestamp, text);
+ log.info("Successfully send message {}", message);
+ sink.success(message);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
+ chatRoomId,
+ key,
+ timestamp,
+ text,
+ exception);
+ sink.error(exception);
+ }
+ }));
+ });
+ }
+
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
@Override
public void run()
{
- consumer.subscribe(List.of(topic));
+ consumer.subscribe(List.of(topic), this);
running = true;
}
catch (WakeupException e)
{
- }
- catch (RecordDeserializationException e)
- {
+ log.info("Received WakeupException, exiting!");
+ running = false;
}
}
+
+ log.info("Exiting normally");
}
void loadMessages(ConsumerRecords<String, MessageTo> records)
return IntStream
.range(0, numShards)
.filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard])
- .collect(
- () -> Boolean.TRUE, // TODO: Boolean is immutable
- (acc, v) -> Boolean.valueOf(acc && v), // TODO: Boolean is immutable
- (a, b) -> Boolean.valueOf(a && b)); // TODO: Boolean is immutable
+ .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
}
void pauseAllOwnedPartions()
.toList());
}
- Mono<Message> sendMessage(
- UUID chatRoomId,
- Message.MessageKey key,
- LocalDateTime timestamp,
- String text)
- {
- int shard = this.shardingStrategy.selectShard(chatRoomId);
- TopicPartition tp = new TopicPartition(topic, shard);
- ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
- return Mono.create(sink ->
- {
- ProducerRecord<String, MessageTo> record =
- new ProducerRecord<>(
- tp.topic(),
- tp.partition(),
- zdt.toEpochSecond(),
- chatRoomId.toString(),
- MessageTo.of(key.getUsername(), key.getMessageId(), text));
-
- producer.send(record, ((metadata, exception) ->
- {
- if (metadata != null)
- {
- // On successful send
- Message message = new Message(key, metadata.offset(), timestamp, text);
- log.info("Successfully send message {}", message);
- sink.success(message);
- }
- else
- {
- // On send-failure
- log.error(
- "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
- chatRoomId,
- key,
- timestamp,
- text,
- exception);
- sink.error(exception);
- }
- }));
- });
- }
-
void putChatRoom(ChatRoom chatRoom)
{
Integer partition = chatRoom.getShard();
UUID chatRoomId = chatRoom.getId();
- ChatRoom existingChatRoom = chatrooms[partition].get(chatRoomId);
- if (existingChatRoom == null)
+ if (chatrooms[partition].containsKey(chatRoomId))
+ {
+ log.warn("Ignoring existing chat-room: " + chatRoom);
+ }
+ else
{
log.info(
- "Creating new chat-room in partition {}: {}",
+ "Adding new chat-room to partition {}: {}",
partition,
chatRoom);
+
chatrooms[partition].put(chatRoomId, chatRoom);
}
- else
- {
- if (chatRoom.getShard() != existingChatRoom.getShard())
- {
- throw new IllegalArgumentException(
- "Could not change the shard of existing chat-room " +
- chatRoomId + " from " +
- existingChatRoom.getShard() + " to " +
- chatRoom.getShard());
- }
- else
- {
- log.info(
- "Updating chat-room in partition {}: {} -> {}",
- partition,
- existingChatRoom,
- chatRoom);
- existingChatRoom.s
- }
- }
}
Mono<ChatRoom> getChatRoom(int shard, UUID id)
return Mono.justOrEmpty(chatrooms[shard].get(id));
}
- Flux<ChatRoom> getChatRooms(int shard)
+ Flux<ChatRoom> getChatRooms()
{
- return Flux.fromStream(chatrooms[shard].values().stream());
+ return Flux.fromStream(IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .mapToObj(shard -> Integer.valueOf(shard))
+ .flatMap(shard -> chatrooms[shard].values().stream()));
}
}