import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
private final Consumer<String, AbstractTo> consumer;
private final ZoneId zoneId;
private final int numShards;
+ private final int bufferSize;
+ private final Clock clock;
private final boolean[] isShardOwned;
private final long[] currentOffset;
private final long[] nextOffset;
private final Map<UUID, ChatRoom>[] chatrooms;
- private final KafkaLikeShardingStrategy shardingStrategy;
private boolean running;
@Getter
Producer<String, AbstractTo> producer,
Consumer<String, AbstractTo> consumer,
ZoneId zoneId,
- int numShards)
+ int numShards,
+ int bufferSize,
+ Clock clock)
{
log.debug(
"Creating ChatMessageChannel for topic {} with {} partitions",
this.producer = producer;
this.zoneId = zoneId;
this.numShards = numShards;
+ this.bufferSize = bufferSize;
+ this.clock = clock;
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
IntStream
.range(0, numShards)
.forEach(shard -> this.chatrooms[shard] = new HashMap<>());
- this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
}
LocalDateTime timestamp,
String text)
{
- int shard = this.shardingStrategy.selectShard(chatRoomId);
- TopicPartition tp = new TopicPartition(topic, shard);
ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
return Mono.create(sink ->
{
- ProducerRecord<String, ChatMessageTo> record =
+ ProducerRecord<String, AbstractTo> record =
new ProducerRecord<>(
- tp.topic(),
- tp.partition(),
+ topic,
+ null,
zdt.toEpochSecond(),
chatRoomId.toString(),
ChatMessageTo.of(key.getUsername(), key.getMessageId(), text));
(ChatMessageTo) record.value(),
record.partition());
break;
+
+ default:
+ log.debug(
+ "Ignoring message for chat-room {} with offset {}: {}",
+ chatRoomId,
+ record.offset(),
+ record.value());
}
nextOffset[record.partition()] = record.offset() + 1;
{
return Mono.justOrEmpty(chatrooms[shard].get(id));
}
-
- Flux<ChatRoom> getChatRooms()
- {
- return Flux.fromStream(IntStream
- .range(0, numShards)
- .filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> Integer.valueOf(shard))
- .flatMap(shard -> chatrooms[shard].values().stream()));
- }
}