import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.*;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import java.util.stream.IntStream;
public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
{
private final String topic;
- private final Consumer<String, MessageTo> consumer;
private final Producer<String, MessageTo> producer;
+ private final Consumer<String, MessageTo> consumer;
private final ZoneId zoneId;
private final int numShards;
private final boolean[] isShardOwned;
public ChatMessageChannel(
String topic,
- Consumer<String, MessageTo> consumer,
Producer<String, MessageTo> producer,
+ Consumer<String, MessageTo> consumer,
ZoneId zoneId,
int numShards)
{
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
this.chatrooms = new Map[numShards];
+ IntStream
+ .range(0, numShards)
+ .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
}
@Override
public void run()
{
- consumer.subscribe(List.of(topic));
+ consumer.subscribe(List.of(topic), this);
running = true;
running = false;
}
}
+
+ log.info("Exiting normally");
}
void loadMessages(ConsumerRecords<String, MessageTo> records)
ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
if (chatRoom == null)
{
- // Alles pausieren und erst von putChatRoom wieder resumen lassen!
+ // TODO: Alles pausieren und erst von putChatRoom wieder resumen lassen!
}
KafkaChatRoomService kafkaChatRoomService =
(KafkaChatRoomService) chatRoom.getChatRoomService();
return IntStream
.range(0, numShards)
.filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard])
- .collect(
- () -> Boolean.TRUE, // TODO: Boolean is immutable
- (acc, v) -> Boolean.valueOf(acc && v), // TODO: Boolean is immutable
- (a, b) -> Boolean.valueOf(a && b)); // TODO: Boolean is immutable
+ .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
}
void pauseAllOwnedPartions()
return Mono.justOrEmpty(chatrooms[shard].get(id));
}
- Flux<ChatRoom> getChatRooms(int shard)
+ Flux<ChatRoom> getChatRooms()
{
- return Flux.fromStream(chatrooms[shard].values().stream());
+ return Flux.fromStream(IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .mapToObj(shard -> Integer.valueOf(shard))
+ .flatMap(shard -> chatrooms[shard].values().stream()));
}
}