private final Producer<String, MessageTo> producer;
private final String topic;
private final ZoneId zoneId;
- // private final long[] offsets; Erst mal immer alles neu einlesen
+ private final int numShards;
private final boolean[] isShardOwned;
+ private final long[] currentOffset;
+ private final long[] nextOffset;
private final Map<UUID, ChatRoom>[] chatRoomMaps;
private final KafkaLikeShardingStrategy shardingStrategy;
this.producer = producer;
this.topic = topic;
this.zoneId = zoneId;
- // this.offsets = new long[numShards];
- // for (int i=0; i< numShards; i++)
- // {
- // this.offsets[i] = 0l;
- // }
+ this.numShards = numShards;
this.isShardOwned = new boolean[numShards];
+ this.currentOffset = new long[numShards];
+ this.nextOffset = new long[numShards];
this.chatRoomMaps = new Map[numShards];
this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
}
consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
{
- if (!topicPartition.topic().equals(topic))
- {
- log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
- return;
- }
-
int partition = topicPartition.partition();
- long unseenOffset = 0; // offsets[partition];
+ isShardOwned[partition] = true;
+ this.currentOffset[partition] = currentOffset;
log.info(
- "Loading messages from partition {}: start-offset={} -> current-offset={}",
+ "Partition assigned: {} - loading messages: next={} -> current={}",
partition,
- unseenOffset,
+ nextOffset[partition],
currentOffset);
- // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
- consumer.seek(topicPartition, unseenOffset);
+ consumer.seek(topicPartition, nextOffset[partition]);
});
consumer.resume(partitions);
{
partitions.forEach(topicPartition ->
{
- if (!topicPartition.topic().equals(topic))
- {
- log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
- return;
- }
-
int partition = topicPartition.partition();
- // long unseenOffset = offsets[partition]; TODO: Offset merken...?
+ isShardOwned[partition] = false;
+ log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
});
- log.info("Revoked partitions: {}", partitions);
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions)
{
+ log.warn("Lost partitions: {}, partitions");
// TODO: Muss auf den Verlust anders reagiert werden?
onPartitionsRevoked(partitions);
}
{
UUID chatRoomId = UUID.fromString(record.key());
MessageTo messageTo = record.value();
- ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
- KafkaChatRoomService kafkaChatRoomService =
- (KafkaChatRoomService) chatRoom.getChatRoomService();
+
Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
+
Instant instant = Instant.ofEpochSecond(record.timestamp());
LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+
Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
+
+ ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
+ KafkaChatRoomService kafkaChatRoomService =
+ (KafkaChatRoomService) chatRoom.getChatRoomService();
+
kafkaChatRoomService.persistMessage(message);
}
}
{
if (!records.isEmpty())
{
- throw new IllegalStateException("All owned partions should be paused, when no load is in progress!");
+ throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
}
}
}