From e9f7c11c3fc1a5ee3004aee194e4901fe6addb36 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 22 Sep 2023 16:56:06 +0200 Subject: [PATCH] WIP:haproxy --- .../kafka/chat/backend/ChatBackendProperties.java | 2 +- .../domain/exceptions/ShardNotOwnedException.java | 7 +++++-- .../inmemory/InMemoryServicesConfiguration.java | 1 + .../inmemory/ShardedChatHomeService.java | 9 ++++++--- .../backend/implementation/kafka/DataChannel.java | 9 ++++++--- .../kafka/KafkaServicesConfiguration.java | 4 ++-- .../backend/api/ChatBackendControllerTest.java | 15 ++++++++++----- .../inmemory/ShardedChatHomeServiceTest.java | 1 + 8 files changed, 32 insertions(+), 16 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 9ace559d..21330e17 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -13,6 +13,7 @@ import java.nio.file.Paths; @Setter public class ChatBackendProperties { + private String instanceId = "DEV"; private String allowedOrigins = "http://localhost:4200"; private int chatroomBufferSize = 8; private ServiceType services = ServiceType.inmemory; @@ -20,7 +21,6 @@ public class ChatBackendProperties private KafkaServicesProperties kafka = new KafkaServicesProperties(); private String haproxyRuntimeApi = "haproxy:8401"; private String haproxyMap = "/usr/local/etc/haproxy/sharding.map"; - private String haproxyInstanceId = "DEV"; @Getter diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java index 25df317b..d5bc5ce4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java @@ -5,13 +5,16 @@ import lombok.Getter; public class ShardNotOwnedException extends IllegalStateException { + @Getter + private final String instanceId; @Getter private final int shard; - public ShardNotOwnedException(int shard) + public ShardNotOwnedException(String instanceId, int shard) { - super("This instance does not own the shard " + shard); + super("Instance " + instanceId + " does not own the shard " + shard); + this.instanceId = instanceId; this.shard = shard; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java index bc6f103b..5b5785ea 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java @@ -58,6 +58,7 @@ public class InMemoryServicesConfiguration properties.getChatroomBufferSize())); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); return new ShardedChatHomeService( + properties.getInstanceId(), chatHomes, properties.getInmemory().getShardOwners(), strategy); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java index c281d9e1..ab7f8d43 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java @@ -19,6 +19,7 @@ import java.util.stream.Collectors; @Slf4j public class ShardedChatHomeService implements ChatHomeService { + private final String instanceId; private final SimpleChatHomeService[] chatHomes; private final Set ownedShards; private final String[] shardOwners; @@ -26,10 +27,12 @@ public class ShardedChatHomeService implements ChatHomeService public ShardedChatHomeService( + String instanceId, SimpleChatHomeService[] chatHomes, URI[] shardOwners, ShardingStrategy shardingStrategy) { + this.instanceId = instanceId; this.chatHomes = chatHomes; this.shardOwners = Arrays .stream(shardOwners) @@ -54,7 +57,7 @@ public class ShardedChatHomeService implements ChatHomeService { int shard = shardingStrategy.selectShard(id); return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) + ? Mono.error(new ShardNotOwnedException(instanceId, shard)) : chatHomes[shard].createChatRoom(id, name); } @@ -63,7 +66,7 @@ public class ShardedChatHomeService implements ChatHomeService { int shard = selectShard(id); return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) + ? Mono.error(new ShardNotOwnedException(instanceId, shard)) : chatHomes[shard] .getChatRoomInfo(id) .onErrorMap(throwable -> throwable instanceof UnknownChatroomException @@ -87,7 +90,7 @@ public class ShardedChatHomeService implements ChatHomeService { int shard = selectShard(id); return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) + ? Mono.error(new ShardNotOwnedException(instanceId, shard)) : chatHomes[shard] .getChatRoomData(id) .onErrorMap(throwable -> throwable instanceof UnknownChatroomException diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 1a2f1b0f..532a3c12 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -19,13 +19,13 @@ import reactor.core.publisher.Mono; import java.time.*; import java.util.*; -import java.util.function.Function; import java.util.stream.IntStream; @Slf4j public class DataChannel implements Runnable, ConsumerRebalanceListener { + private final String instanceId; private final String topic; private final Producer producer; private final Consumer consumer; @@ -46,6 +46,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener public DataChannel( + String instanceId, String topic, Producer producer, Consumer dataChannelConsumer, @@ -57,9 +58,11 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener ShardingPublisherStrategy shardingPublisherStrategy) { log.debug( - "Creating DataChannel for topic {} with {} partitions", + "{}: Creating DataChannel for topic {} with {} partitions", + instanceId, topic, numShards); + this.instanceId = instanceId; this.topic = topic; this.consumer = dataChannelConsumer; this.producer = producer; @@ -301,7 +304,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener if (!isShardOwned[shard]) { - return Mono.error(new ShardNotOwnedException(shard)); + return Mono.error(new ShardNotOwnedException(instanceId, shard)); } return infoChannel diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 2d0f4c85..4d1098fb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -22,7 +22,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.web.reactive.function.client.WebClient; import java.net.InetSocketAddress; import java.time.Clock; @@ -143,6 +142,7 @@ public class KafkaServicesConfiguration ShardingPublisherStrategy shardingPublisherStrategy) { return new DataChannel( + properties.getInstanceId(), properties.getKafka().getDataChannelTopic(), producer, dataChannelConsumer, @@ -292,7 +292,7 @@ public class KafkaServicesConfiguration return new HaproxyShardingPublisherStrategy( haproxyAddress, properties.getHaproxyMap(), - properties.getHaproxyInstanceId()); + properties.getInstanceId()); } @Bean diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index 1478e7c5..01de390c 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -259,8 +259,9 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -279,8 +280,9 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -301,8 +303,9 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -328,8 +331,9 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -352,8 +356,9 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java index 3ff9e9e1..b830f300 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java @@ -36,6 +36,7 @@ public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS); return new ShardedChatHomeService( + "http://instance-0", chatHomes, IntStream .range(0, NUM_SHARDS) -- 2.20.1