private final Producer<String, MessageTo> producer;
private final String topic;
private final ZoneId zoneId;
private final Producer<String, MessageTo> producer;
private final String topic;
private final ZoneId zoneId;
private final Map<UUID, ChatRoom>[] chatRoomMaps;
private final KafkaLikeShardingStrategy shardingStrategy;
private final Map<UUID, ChatRoom>[] chatRoomMaps;
private final KafkaLikeShardingStrategy shardingStrategy;
this.chatRoomMaps = new Map[numShards];
this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
}
this.chatRoomMaps = new Map[numShards];
this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
}
- // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
- consumer.seek(topicPartition, unseenOffset);
+ consumer.seek(topicPartition, nextOffset[partition]);
Instant instant = Instant.ofEpochSecond(record.timestamp());
LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
Instant instant = Instant.ofEpochSecond(record.timestamp());
LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
Message message = new Message(key, record.offset(), timestamp, messageTo.getText());