private final ChatMessageService service;
private final Clock clock;
private final int historyLimit;
+
private Sinks.Many<Message> sink;
- private volatile boolean active = false;
+ private volatile boolean active;
public ChatRoomData(
// @RequiredArgsConstructor unfortunately not possible, because
// the `historyLimit` is not set, if `createSink()` is called
// from the variable declaration!
+ this.sink = createSink();
+ this.active = false;
}
{
if (active)
{
- log.info("{} is already active!", service.getChatRoomInfo());
+ log.error("{} is already active!", service.getChatRoomInfo());
return;
}
log.info("{} is being activated", service.getChatRoomInfo());
- this.sink = createSink();
active = true;
}
public void deactivate()
{
+ if (!active)
+ {
+ log.error("{} is already inactive!", service.getChatRoomInfo());
+ return;
+ }
+
log.info("{} is being deactivated", service.getChatRoomInfo());
active = false;
+ }
+
+ public void reset()
+ {
+ ChatRoomInfo chatRoomInfo = service.getChatRoomInfo();
+ if (active)
+ {
+ throw new IllegalStateException("Could not reset active ChatRoomData: " + chatRoomInfo);
+ }
+
+ log.info("Resetting {}", chatRoomInfo);
sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
+ sink = createSink();
}
private Sinks.Many<Message> createSink()
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.*;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import java.util.stream.IntStream;
private final Map<UUID, ChatRoomData>[] chatRoomData;
private final ChannelMediator channelMediator;
private final ShardingPublisherStrategy shardingPublisherStrategy;
+ private final List<ChatRoomData> previouslyOwnedChatRoomData = new LinkedList<>();
private boolean running;
@Getter
{
// On successful send
Message message = new Message(key, metadata.offset(), timestamp, text);
- log.info("Successfully send message {}", message);
+ log.info("Successfully sent message {} to chat-room {}", message, chatRoomId);
sink.success(message);
}
else
{
// On send-failure
log.error(
- "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
+ "Could not sent message to chat-room={}, key={}, timestamp={}, text={}: {}",
chatRoomId,
key,
timestamp,
});
consumer.resume(partitions);
+
+ Flux
+ .fromIterable(previouslyOwnedChatRoomData)
+ .filter(chatRoomData -> !isShardOwned[chatRoomData.getChatRoomService().getChatRoomInfo().getShard()])
+ .doOnNext(chatRoomData -> chatRoomData.reset())
+ .then()
+ .doOnSuccess(nothing ->
+ {
+ previouslyOwnedChatRoomData.clear();
+ log.info("Done resetting revoked ChatRoomData");
+ })
+ .block();
}
@Override
chatRoomData[partition]
.values()
- .forEach(chatRoomData -> chatRoomData.deactivate());
+ .forEach(chatRoomData ->
+ {
+ chatRoomData.deactivate();
+ previouslyOwnedChatRoomData.add(chatRoomData);
+ });
channelMediator.shardRevoked(partition);
});
}
else
{
- log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit);
+ log.info("Creating ChatRoomData {} with history-limit {}", chatRoomInfo, historyLimit);
KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomInfo);
chatRoomData = new ChatRoomData(clock, service, historyLimit);
this.chatRoomData[shard].put(chatRoomId, chatRoomData);
}
@Test
- @DisplayName("Assert, that a listended to chat-room emits completed, if it is deactivated")
- void testListenedToChatRoomEmitsCompletedIfItIsDeactivated()
+ @DisplayName("Assert, that a listened to chat-room emits completed, if it is deactivated and reset")
+ void testListenedToChatRoomEmitsCompletedIfItIsDeactivatedAndActivatedWithReset()
{
// Given
Message message1 = new Message(key, 1l, timestamp, "#1");
// When
Flux<Message> listenFlux = chatRoomData.listen();
chatRoomData.deactivate();
+ chatRoomData.reset();
// Then
Awaitility