private int chatroomBufferSize = 8;
private ServiceType services = ServiceType.inmemory;
private InMemoryServicesProperties inmemory = new InMemoryServicesProperties();
+ private KafkaServicesProperties kafka = new KafkaServicesProperties();
@Getter
private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString();
}
+ @Getter
+ @Setter
+ public static class KafkaServicesProperties
+ {
+ private String topic = "test";
+ private int numPartitions = 2;
+ }
+
public enum ServiceType { inmemory }
public enum StorageStrategyType { files, mongodb }
public enum ShardingStrategyType { none, kafkalike }
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.util.UUID;
-
-
-/**
- * Derzeit eigentlich einzige aktive Strategie!
- * Rückbau?!?!
- */
-@RequiredArgsConstructor
-@Slf4j
-class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
-{
- private final KafkaChatRoomService kafkaChatRoomService;
- private final Producer<String, MessageTo> producer;
- private final TopicPartition tp;
- private final UUID chatRoomId;
- private final ZoneOffset zoneOffset;
-
-
- @Override
- public Mono<Message> handleMessage(
- Message.MessageKey key,
- LocalDateTime timestamp,
- String text)
- {
- return Mono.create(sink ->
- {
- ProducerRecord<String, MessageTo> record =
- new ProducerRecord<>(
- tp.topic(),
- tp.partition(),
- timestamp.toEpochSecond(zoneOffset),
- chatRoomId.toString(),
- MessageTo.of(key.getUsername(), key.getMessageId(), text));
-
- producer.send(record, ((metadata, exception) ->
- {
- if (metadata != null)
- {
- // On successful send
- {
- // Emit new message
- Message message = new Message(key, metadata.offset(), timestamp, text);
- kafkaChatRoomService.addMessage(message);
- }
-
- sink.success();
- }
- else
- {
- // On send-failure
- sink.error(exception);
- }
- }));
- });
- }
-}
import java.time.*;
import java.util.*;
-import java.util.concurrent.ExecutorService;
import java.util.stream.IntStream;
@Slf4j
public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener
{
- private final ExecutorService executorService;
private final Consumer<String, MessageTo> consumer;
private final Producer<String, MessageTo> producer;
private final String topic;
private final boolean[] isShardOwned;
private final long[] currentOffset;
private final long[] nextOffset;
- private final Map<UUID, ChatRoom>[] chatRoomMaps;
+ private final Map<UUID, ChatRoom>[] chatrooms;
private final KafkaLikeShardingStrategy shardingStrategy;
private boolean running;
public KafkaChatHomeService(
- ExecutorService executorService,
Consumer<String, MessageTo> consumer,
Producer<String, MessageTo> producer,
String topic,
int numShards)
{
log.debug("Creating KafkaChatHomeService");
- this.executorService = executorService;
this.consumer = consumer;
this.producer = producer;
this.topic = topic;
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
- this.chatRoomMaps = new Map[numShards];
+ this.chatrooms = new Map[numShards];
this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
}
Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
- ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
+ ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
KafkaChatRoomService kafkaChatRoomService =
(KafkaChatRoomService) chatRoom.getChatRoomService();
}
+ public void putChatRoom(ChatRoom chatRoom)
+ {
+ // TODO: Nachricht senden!
+ chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+ }
+
@Override
public Mono<ChatRoom> getChatRoom(int shard, UUID id)
{
}
else
{
- return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
+ return Mono.justOrEmpty(chatrooms[shard].get(id));
}
}
}
else
{
- return Flux.fromStream(chatRoomMaps[shard].values().stream());
+ return Flux.fromStream(chatrooms[shard].values().stream());
}
}
}
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
+import java.time.Clock;
import java.util.UUID;
+@RequiredArgsConstructor
+@Slf4j
public class KafkaChatRoomFactory implements ChatRoomFactory
{
+ private final KafkaChatHomeService kafkaChatHomeService;
+ private final ShardingStrategy shardingStrategy;
+ private final Clock clock;
+ private final int bufferSize;
+
@Override
public Mono<ChatRoom> createChatRoom(UUID id, String name)
{
- return null;
+ log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+ int shard = shardingStrategy.selectShard(id);
+ KafkaChatRoomService service = new KafkaChatRoomService(kafkaChatHomeService, id);
+ ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+ kafkaChatHomeService.putChatRoom(chatRoom);
}
}
package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
-import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
+import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.time.Clock;
+
+
+@ConditionalOnProperty(
+ prefix = "chat.backend",
+ name = "services",
+ havingValue = "inmemory",
+ matchIfMissing = true)
+@Configuration
+public class KafkaServicesConfiguration implements ApplicationRunner
+{
+ @Bean
+ ChatHome kafkaChatHome(
+ ChatBackendProperties properties,
+ InMemoryChatHomeService chatHomeService,
+ StorageStrategy storageStrategy)
+ {
+ int numShards = properties.getInmemory().getNumShards();
+ SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+ ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+ return new ShardedChatHome(chatHomes, strategy);
+ }
+
+ @Bean
+ KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties)
+ {
+ ShardingStrategyType sharding =
+ properties.getInmemory().getShardingStrategy();
+ int numShards = sharding == ShardingStrategyType.none
+ ? 1
+ : properties.getInmemory().getNumShards();
+ int[] ownedShards = sharding == ShardingStrategyType.none
+ ? new int[] { 0 }
+ : properties.getInmemory().getOwnedShards();
+ return new InMemoryChatHomeService(
+ numShards,
+ ownedShards,
+ storageStrategy.read());
+ }
+
+ @Bean
+ InMemoryChatRoomFactory chatRoomFactory(
+ InMemoryChatHomeService service,
+ ShardingStrategy strategy,
+ Clock clock,
+ ChatBackendProperties properties)
+ {
+ return new InMemoryChatRoomFactory(
+ service,
+ strategy,
+ clock,
+ properties.getChatroomBufferSize());
+ }
+
+ @Bean
+ ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
+ {
+ return new KafkaLikeShardingStrategy(
+ properties.getKafka().getNumPartitions());
+ }
+}