From: Kai Moritz Date: Sun, 17 Sep 2023 09:31:22 +0000 (+0200) Subject: WIP:haproxy X-Git-Tag: rebase--2024-01-27--15-46~3 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=f70bb25f9b0769068028348995a78ece2130cd35;p=demos%2Fkafka%2Fchat WIP:haproxy --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index ec5c7f5f..21330e17 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -13,11 +13,14 @@ import java.nio.file.Paths; @Setter public class ChatBackendProperties { + private String instanceId = "DEV"; private String allowedOrigins = "http://localhost:4200"; private int chatroomBufferSize = 8; private ServiceType services = ServiceType.inmemory; private InMemoryServicesProperties inmemory = new InMemoryServicesProperties(); private KafkaServicesProperties kafka = new KafkaServicesProperties(); + private String haproxyRuntimeApi = "haproxy:8401"; + private String haproxyMap = "/usr/local/etc/haproxy/sharding.map"; @Getter diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java new file mode 100644 index 00000000..9a1c725e --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java @@ -0,0 +1,9 @@ +package de.juplo.kafka.chat.backend.domain; + +import reactor.core.publisher.Mono; + + +public interface ShardingPublisherStrategy +{ + Mono publishOwnership(int shard); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java index 25df317b..d5bc5ce4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java @@ -5,13 +5,16 @@ import lombok.Getter; public class ShardNotOwnedException extends IllegalStateException { + @Getter + private final String instanceId; @Getter private final int shard; - public ShardNotOwnedException(int shard) + public ShardNotOwnedException(String instanceId, int shard) { - super("This instance does not own the shard " + shard); + super("Instance " + instanceId + " does not own the shard " + shard); + this.instanceId = instanceId; this.shard = shard; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java new file mode 100644 index 00000000..3caaeb38 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java @@ -0,0 +1,41 @@ +package de.juplo.kafka.chat.backend.implementation.haproxy; + +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + + +@RequiredArgsConstructor +@Slf4j +public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrategy +{ + private final SocketAddress haproxyAddress; + private final String map; + private final String instanceId; + + + @Override + public Mono publishOwnership(int shard) + { + try + { + SocketChannel socketChannel = SocketChannel.open(haproxyAddress); + String command = "set map " + map + " " + Integer.toString(shard) + " " + instanceId + "\n"; + byte[] commandBytes = command.getBytes(); + ByteBuffer buffer = ByteBuffer.wrap(commandBytes); + socketChannel.write(buffer); + socketChannel.close(); + return Mono.just(instanceId); + } + catch (IOException e) + { + return Mono.error(e); + } + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java index bc6f103b..5b5785ea 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java @@ -58,6 +58,7 @@ public class InMemoryServicesConfiguration properties.getChatroomBufferSize())); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); return new ShardedChatHomeService( + properties.getInstanceId(), chatHomes, properties.getInmemory().getShardOwners(), strategy); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java index c281d9e1..ab7f8d43 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java @@ -19,6 +19,7 @@ import java.util.stream.Collectors; @Slf4j public class ShardedChatHomeService implements ChatHomeService { + private final String instanceId; private final SimpleChatHomeService[] chatHomes; private final Set ownedShards; private final String[] shardOwners; @@ -26,10 +27,12 @@ public class ShardedChatHomeService implements ChatHomeService public ShardedChatHomeService( + String instanceId, SimpleChatHomeService[] chatHomes, URI[] shardOwners, ShardingStrategy shardingStrategy) { + this.instanceId = instanceId; this.chatHomes = chatHomes; this.shardOwners = Arrays .stream(shardOwners) @@ -54,7 +57,7 @@ public class ShardedChatHomeService implements ChatHomeService { int shard = shardingStrategy.selectShard(id); return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) + ? Mono.error(new ShardNotOwnedException(instanceId, shard)) : chatHomes[shard].createChatRoom(id, name); } @@ -63,7 +66,7 @@ public class ShardedChatHomeService implements ChatHomeService { int shard = selectShard(id); return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) + ? Mono.error(new ShardNotOwnedException(instanceId, shard)) : chatHomes[shard] .getChatRoomInfo(id) .onErrorMap(throwable -> throwable instanceof UnknownChatroomException @@ -87,7 +90,7 @@ public class ShardedChatHomeService implements ChatHomeService { int shard = selectShard(id); return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) + ? Mono.error(new ShardNotOwnedException(instanceId, shard)) : chatHomes[shard] .getChatRoomData(id) .onErrorMap(throwable -> throwable instanceof UnknownChatroomException diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 4eedeb4a..532a3c12 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -19,13 +19,13 @@ import reactor.core.publisher.Mono; import java.time.*; import java.util.*; -import java.util.function.Function; import java.util.stream.IntStream; @Slf4j public class DataChannel implements Runnable, ConsumerRebalanceListener { + private final String instanceId; private final String topic; private final Producer producer; private final Consumer consumer; @@ -38,6 +38,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final long[] nextOffset; private final Map[] chatRoomData; private final InfoChannel infoChannel; + private final ShardingPublisherStrategy shardingPublisherStrategy; private boolean running; @Getter @@ -45,6 +46,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener public DataChannel( + String instanceId, String topic, Producer producer, Consumer dataChannelConsumer, @@ -52,12 +54,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener int numShards, int bufferSize, Clock clock, - InfoChannel infoChannel) + InfoChannel infoChannel, + ShardingPublisherStrategy shardingPublisherStrategy) { log.debug( - "Creating DataChannel for topic {} with {} partitions", + "{}: Creating DataChannel for topic {} with {} partitions", + instanceId, topic, numShards); + this.instanceId = instanceId; this.topic = topic; this.consumer = dataChannelConsumer; this.producer = producer; @@ -73,6 +78,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener .range(0, numShards) .forEach(shard -> this.chatRoomData[shard] = new HashMap<>()); this.infoChannel = infoChannel; + this.shardingPublisherStrategy = shardingPublisherStrategy; } @@ -139,6 +145,13 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener consumer.seek(topicPartition, nextOffset[partition]); infoChannel.sendShardAssignedEvent(partition); + shardingPublisherStrategy + .publishOwnership(partition) + .doOnNext(instanceId -> log.info( + "Instance {} was published as owner of shard {}", + instanceId, + partition)) + .subscribe(); }); consumer.resume(partitions); @@ -291,7 +304,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener if (!isShardOwned[shard]) { - return Mono.error(new ShardNotOwnedException(shard)); + return Mono.error(new ShardNotOwnedException(instanceId, shard)); } return infoChannel diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index cafc7757..54c48301 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -2,6 +2,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; +import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; @@ -21,6 +23,7 @@ import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import java.net.InetSocketAddress; import java.time.Clock; import java.time.ZoneId; import java.util.HashMap; @@ -137,9 +140,11 @@ public class KafkaServicesConfiguration Consumer dataChannelConsumer, ZoneId zoneId, Clock clock, - InfoChannel infoChannel) + InfoChannel infoChannel, + ShardingPublisherStrategy shardingPublisherStrategy) { return new DataChannel( + properties.getInstanceId(), properties.getKafka().getDataChannelTopic(), producer, dataChannelConsumer, @@ -147,7 +152,8 @@ public class KafkaServicesConfiguration properties.getKafka().getNumPartitions(), properties.getChatroomBufferSize(), clock, - infoChannel); + infoChannel, + shardingPublisherStrategy); } @Bean @@ -279,6 +285,18 @@ public class KafkaServicesConfiguration return properties; } + @Bean + ShardingPublisherStrategy shardingPublisherStrategy( + ChatBackendProperties properties) + { + String[] parts = properties.getHaproxyRuntimeApi().split(":"); + InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1])); + return new HaproxyShardingPublisherStrategy( + haproxyAddress, + properties.getHaproxyMap(), + properties.getInstanceId()); + } + @Bean ZoneId zoneId() { diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index 1478e7c5..01de390c 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -259,8 +259,9 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -279,8 +280,9 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -301,8 +303,9 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -328,8 +331,9 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -352,8 +356,9 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java index 3ff9e9e1..b830f300 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java @@ -36,6 +36,7 @@ public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS); return new ShardedChatHomeService( + "http://instance-0", chatHomes, IntStream .range(0, NUM_SHARDS) diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java index 956d7cec..52a527d3 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; @@ -10,6 +11,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; +import reactor.core.publisher.Mono; import java.time.Clock; import java.util.List; @@ -23,6 +25,12 @@ public class KafkaTestUtils @Import(KafkaServicesConfiguration.class) public static class KafkaTestConfiguration { + @Bean + public ShardingPublisherStrategy shardingPublisherStrategy() + { + return shard -> Mono.just("MOCKED!"); + } + @Bean public WorkAssignor dataChannelWorkAssignor( ChatBackendProperties properties,