From: Kai Moritz Date: Sat, 16 Sep 2023 19:40:45 +0000 (+0200) Subject: WIP X-Git-Tag: rebase--2024-01-26--18-11~19 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=5e3a56a969a8d5b427ca4590830e2ef088f57f55;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 381c6c68..94e049bb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -4,6 +4,7 @@ import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; +import java.net.URI; import java.nio.file.Paths; @@ -25,7 +26,8 @@ public class ChatBackendProperties { private ShardingStrategyType shardingStrategy = ShardingStrategyType.none; private int numShards = 1; - private int[] ownedShards = new int[] { 0 }; + private int[] ownedShards = new int[0]; + private URI[] shardOwners = new URI[0]; private StorageStrategyType storageStrategy = StorageStrategyType.none; private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString(); } @@ -34,6 +36,7 @@ public class ChatBackendProperties @Setter public static class KafkaServicesProperties { + private URI instanceUri = URI.create("http://localhost:8080"); private String clientIdPrefix; private String bootstrapServers = ":9092"; private String infoChannelTopic = "info_channel"; diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index 529be8df..f3efe791 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -128,6 +128,12 @@ public class ChatBackendController .build()); } + @GetMapping("/shards") + public Mono getShardOwners() + { + return chatHomeService.getShardOwners(); + } + @PostMapping("/store") public void store() { diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java index 388a20ad..19fa26c4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java @@ -15,4 +15,6 @@ public interface ChatHomeService Flux getChatRoomInfo(); Mono getChatRoomData(UUID id); + + Mono getShardOwners(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java index f60b193e..89a3c111 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java @@ -34,7 +34,8 @@ public class InMemoryServicesConfiguration return new SimpleChatHomeService( storageStrategy, clock, - properties.getChatroomBufferSize()); + properties.getChatroomBufferSize(), + properties.getInstanceUri()); } @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java index 06c197bf..b7268a86 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java @@ -8,6 +8,7 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.UUID; @@ -90,6 +91,15 @@ public class ShardedChatHomeService implements ChatHomeService : throwable); } + @Override + public Mono getShardOwners() + { + return Mono.just(Arrays + .stream(chatHomes) + .map(chatHome -> chatHome.getInstanceUri()) + .toArray(size -> new String[size])); + } + private int selectShard(UUID chatroomId) { return shardingStrategy.selectShard(chatroomId); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index 30a181ee..4cb490d6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -3,10 +3,12 @@ package de.juplo.kafka.chat.backend.implementation.inmemory; import de.juplo.kafka.chat.backend.domain.*; import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; import de.juplo.kafka.chat.backend.implementation.StorageStrategy; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.URI; import java.time.Clock; import java.util.*; @@ -19,26 +21,31 @@ public class SimpleChatHomeService implements ChatHomeService private final Map chatRoomData; private final Clock clock; private final int bufferSize; + @Getter + private final URI instanceUri; public SimpleChatHomeService( StorageStrategy storageStrategy, Clock clock, - int bufferSize) + int bufferSize, + URI instanceUri) { this( null, storageStrategy, clock, - bufferSize); + bufferSize, + instanceUri); } public SimpleChatHomeService( Integer shard, StorageStrategy storageStrategy, Clock clock, - int bufferSize) + int bufferSize, + URI instanceUri) { log.info("Created SimpleChatHome for shard {}", shard); ; @@ -78,6 +85,7 @@ public class SimpleChatHomeService implements ChatHomeService }); this.clock = clock; this.bufferSize = bufferSize; + this.instanceUri = instanceUri; } @@ -114,4 +122,10 @@ public class SimpleChatHomeService implements ChatHomeService .justOrEmpty(chatRoomData.get(id)) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } + + @Override + public Mono getShardOwners() + { + return Mono.empty(); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index da906631..4eedeb4a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -138,6 +138,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener currentOffset); consumer.seek(topicPartition, nextOffset[partition]); + infoChannel.sendShardAssignedEvent(partition); }); consumer.resume(partitions); @@ -151,6 +152,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener int partition = topicPartition.partition(); isShardOwned[partition] = false; log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + infoChannel.sendShardRevokedEvent(partition); }); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 26e86963..8756a062 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -3,16 +3,20 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.URI; import java.time.*; import java.util.HashMap; import java.util.Map; @@ -30,6 +34,7 @@ public class InfoChannel implements Runnable private final long[] currentOffset; private final long[] nextOffset; private final Map chatRoomInfo; + private final String instanceUri; private boolean running; @@ -37,7 +42,8 @@ public class InfoChannel implements Runnable public InfoChannel( String topic, Producer producer, - Consumer infoChannelConsumer) + Consumer infoChannelConsumer, + URI instanceUri) { log.debug( "Creating InfoChannel for topic {}", @@ -55,6 +61,8 @@ public class InfoChannel implements Runnable IntStream .range(0, numShards) .forEach(partition -> this.nextOffset[partition] = -1l); + + this.instanceUri = instanceUri.toASCIIString(); } @@ -83,7 +91,7 @@ public class InfoChannel implements Runnable { if (metadata != null) { - log.info("Successfully sent chreate-request for chat room: {}", to); + log.info("Successfully sent created event for chat chat-room: {}", to); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard); sink.success(chatRoomInfo); } @@ -91,7 +99,7 @@ public class InfoChannel implements Runnable { // On send-failure log.error( - "Could not send create-request for chat room (id={}, name={}): {}", + "Could not send created event for chat-room (id={}, name={}): {}", chatRoomId, name, exception); @@ -101,6 +109,70 @@ public class InfoChannel implements Runnable }); } + Mono sendShardAssignedEvent(int shard) + { + EventShardAssigned to = EventShardAssigned.of(shard, instanceUri); + + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully sent shard assigned event for shard: {}", shard); + sink.success(metadata); + } + else + { + // On send-failure + log.error( + "Could not send shard assigned event for shard {}: {}", + shard, + exception); + sink.error(exception); + } + })); + }); + } + + Mono sendShardRevokedEvent(int shard) + { + EventShardRevoked to = EventShardRevoked.of(shard, instanceUri); + + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully sent shard revoked event for shard: {}", shard); + sink.success(metadata); + } + else + { + // On send-failure + log.error( + "Could not send shard revoked event for shard {}: {}", + shard, + exception); + sink.error(exception); + } + })); + }); + } + @Override public void run() @@ -146,6 +218,16 @@ public class InfoChannel implements Runnable createChatRoom(eventChatRoomCreated.toChatRoomInfo()); break; + case EVENT_SHARD_ASSIGNED: + EventShardAssigned eventShardAssigned = + (EventShardAssigned) record.value(); + break; + + case EVENT_SHARD_REVOKED: + EventShardRevoked eventShardRevoked = + (EventShardRevoked) record.value(); + break; + default: log.debug( "Ignoring message for key={} with offset={}: {}", diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java index 1aef3fd1..843a2c45 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java @@ -12,6 +12,8 @@ public class AbstractMessageTo COMMAND_CREATE_CHATROOM, EVENT_CHATMESSAGE_RECEIVED, EVENT_CHATROOM_CREATED, + EVENT_SHARD_ASSIGNED, + EVENT_SHARD_REVOKED, } @Getter diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardAssigned.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardAssigned.java new file mode 100644 index 00000000..8ba23f3c --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardAssigned.java @@ -0,0 +1,35 @@ +package de.juplo.kafka.chat.backend.implementation.kafka.messages.info; + +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class EventShardAssigned extends AbstractMessageTo +{ + private int shard; + private String uri; + + + public EventShardAssigned() + { + super(ToType.EVENT_SHARD_ASSIGNED); + } + + + public static EventShardAssigned of( + int shard, + String uri) + { + EventShardAssigned event = new EventShardAssigned(); + event.setShard(shard); + event.setUri(uri); + return event; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardRevoked.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardRevoked.java new file mode 100644 index 00000000..1e37f43f --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardRevoked.java @@ -0,0 +1,35 @@ +package de.juplo.kafka.chat.backend.implementation.kafka.messages.info; + +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class EventShardRevoked extends AbstractMessageTo +{ + private int shard; + private String uri; + + + public EventShardRevoked() + { + super(ToType.EVENT_SHARD_REVOKED); + } + + + public static EventShardRevoked of( + int shard, + String uri) + { + EventShardRevoked event = new EventShardRevoked(); + event.setShard(shard); + event.setUri(uri); + return event; + } +}