Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
default void write(ChatHomeService chatHomeService)
+ {
+ write(
+ chatHomeService,
+ this::logSuccessChatHomeService,
+ this::logFailureChatHomeService);
+ }
+
+ default void write(
+ ChatHomeService chatHomeService,
+ ChatHomeServiceWrittenSuccessCallback successCallback,
+ ChatHomeServiceWrittenFailureCallback failureCallback)
{
writeChatRoomInfo(
chatHomeService
.getChatRoomInfo()
+ .doOnComplete(() -> successCallback.accept(chatHomeService))
+ .doOnError(throwable -> failureCallback.accept(chatHomeService, throwable))
.doOnNext(chatRoomInfo ->
writeChatRoomData(
chatRoomInfo.getId(),
void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
Flux<Message> readChatRoomData(UUID chatRoomId);
+ interface ChatHomeServiceWrittenSuccessCallback extends Consumer<ChatHomeService> {}
+ interface ChatHomeServiceWrittenFailureCallback extends BiConsumer<ChatHomeService, Throwable> {}
+
+ default void logSuccessChatHomeService(ChatHomeService chatHomeService)
+ {
+ log.info("Successfully stored {}", chatHomeService);
+ }
+
+ default void logFailureChatHomeService(ChatHomeService chatHomeService, Throwable throwable)
+ {
+ log.error("Could not store {}: {}", chatHomeService, throwable);
+ }
+
interface ChatRoomWrittenSuccessCallback extends Consumer<UUID> {}
interface ChatRoomWrittenFailureCallback extends BiConsumer<UUID, Throwable> {}
for (int shard = 0; shard < chatHomes.length; shard++)
if(chatHomes[shard] != null)
this.ownedShards.add(shard);
- log.info(
- "Created ShardedChatHome for shards: {}",
- ownedShards
- .stream()
- .map(String::valueOf)
- .collect(Collectors.joining(", ")));
+ log.info("Created {}", this);
}
{
return shardingStrategy.selectShard(chatroomId);
}
+
+ @Override
+ public String toString()
+ {
+ StringBuffer stringBuffer = new StringBuffer(ShardedChatHomeService.class.getSimpleName());
+ stringBuffer.append(", shards=[");
+ stringBuffer.append(ownedShards
+ .stream()
+ .sorted()
+ .map(String::valueOf)
+ .collect(Collectors.joining(",")));
+ stringBuffer.append("]");
+ return stringBuffer.toString();
+ }
}
Clock clock,
int bufferSize)
{
- log.info("Created SimpleChatHome for shard {}", shard);
;
this.shard = shard;
+ log.info("Created {}", this);
+
this.chatRoomInfo = new HashMap<>();
this.chatRoomData = new HashMap<>();
storageStrategy
{
return Mono.empty();
}
+
+ @Override
+ public String toString()
+ {
+ return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard;
+ }
}
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
return new ChatRoomData(clock, service, bufferSize);
}
+
+ ConsumerGroupMetadata getConsumerGroupMetadata()
+ {
+ return consumer.groupMetadata();
+ }
}
byte[] serializedKey = chatRoomId.toString().getBytes();
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
+
+ @Override
+ public String toString()
+ {
+ StringBuffer stringBuffer = new StringBuffer(KafkaChatHomeService.class.getSimpleName());
+ stringBuffer.append(", ");
+ stringBuffer.append(dataChannel.getConsumerGroupMetadata());
+ return stringBuffer.toString();
+ }
}