--- /dev/null
+Aktuelle Idee für die Kafka-Anbindung
+=====================================
+
+- *Beobachtung:* Alle schreibenden Anfragen für Nachrichten müssen erst
+ durch `ChatHomeService.getChatRoom(int, UUID)` den zuständigen
+ `ChatRoom` ermitteln, bevor sie die Nachricht schreiben können.
+ - D.h., das Locking, das während einem Rebalance nötig ist, kann
+ *vollständig* in `KafkaChatHomeService` umgesetzt werden.
+ - In `KafkaChatRoomService` muss *keinerlei* Kontrolle mehr erfolgen,
+ ob der `ChatRoom` tatsächlich gerade in die Zuständigkeit der Instanz
+ fällt, da die Anfragen *hier nie ankommen*, wenn die Instanz nicht
+ zuständig ist, da sie dann bereits in `getChatRoom(int, UUID)`
+ abgelehnt werden!
+ - Die in der Domain-Klasse `ChatRoom` definierte Logik, für die
+ Behandlung doppelter Nachrichten *ist vollständig valide*, da Anfragen
+ für einen bestimmten `ChatRoom` dort (bei korrekt implementiertem Locking
+ in `KafkaChatHomeService`) nur ankommen, wenn die Instanz *tatsächlich*
+ für den `ChatRoom` zuständig ist.
+ - D.h. insbesondere auch, dass die Antwort dort (also in dem `ChatRoom`)
+ erst ankommen, wenn dieser *vollständig geladen* ist, so dass die lokale
+ Kontrolle auf doppelte Nachrichten logisch gültig ist.
+- *Anforderung:* Wenn ein Rebalance aktiv ist, wird die Instanz gelockt.
+ - Das Locking erfolg in `KafkaChatRoomService`, durch das alle Anfragen
+ durchgreifen müssen, so dass hier *zentral alle Aktionen* auf einzelnen
+ `ChatRoom`-Instanzen *unterbunden* werden können.
+- *Vereinfachung:* Wenn `KafkaChatRoomService` gelockt ist, wird für alle
+ Zugriffe eine `ShardNotOwnedException` erzeugt.
+ - Dadurch wird das Zustands-Handling *extrem vereinfacht*, da Anfragen,
+ die *während* einem Rebalance auflaufen
+- *Lade-Modus - Initialisierung und Abschluss-Bedingung:*
+ - Wenn durch einen Rebalance in den Lade-Modus gewechselt wird, muss die
+ *tatsächliche* Offset-Position der zuletzt geschriebenen Nachrichten
+ für die zugeordneten Partitionen ermittelt werden.
+ - Anschließend wird ein Seek auf die Offset-Position 0 (später: auf die
+ in den lokalen Daten gespeicherte Offset-Position) durchgeführt.
+ - Der Lade-Modus ist abgeschlossen, wenn für alle zugeordneten Partitionen
+ der zum Rebalance-Zeitpunkt ermittelte Offset der aktuellsten Nachrichten
+ erreicht ist.
+ - Wenn ein weiterer Rebalance erfolgt, während der Lade-Modus bereits
+ aktiv ist, sollte es genügen, die Informationen über die zugeordneten
+ Partitionen zu aktualisieren und die Aktuellen Offsets für diese neu
+ zu ermitteln.
+- *Lade-Modus vs. Default-Modus:*
+ - Nur während des Lade-Modus *liest* die `KafkaChatRoomServcie`-Instanz
+ tatsächlich die Nachrichten aus den zugeordneten Partitionen.
+ - Im Default-Modus *schreibt* sie die Nachrichten nur in die Partitionen
+ und speichert sie lokal ab, sobald die *Bestätigung durch den `Producer`*
+ erfolgt.
+ - D.h. insbesondere, dass der `KafkaConsumer` im Default-Modus für alle
+ zugeordneten Partitionen *pausiert* wird!
+ - Damit die Offset-Positon nicht unnötig zurückfällt, sollte ggf.
+ regelmäßig für alle zugeordneten Partitionen ein Seek auf die zuletzt
+ vom Producer bestätigt geschriebenen Offsets durchgeführt werden.
+ - *Beachte:_ Dies ist nicht nötig, wenn die Offsets eh in den lokal
+ gespeicherten Daten gehalten und aus diesen wiederhergestellt werden!
+- *Umsetzungs-Details:*
+ - Da die in dem Interface `ConsumerRebalanceListener` definierten Methoden
+ in einem zeitkritischem Setting laufen, muss das eigentliche Laden der
+ `ChatRoom`-Zustände separat erfolgen, so dass die Kontrolle schnell an
+ den `KafkaConsumer` zurückgegeben werden kann.
+ - Dafür muss der `KafkaChatRoomService` in einen speziellen Lade-Modus
+ wechseln, der aktiv ist, bis die `ChatRoom`-Instanzen für alle durch
+ den Rebalance zugeteilten Partitionen aus dem Log wiederhergestellt
+ wurden.
+ - Das Lock der `KafkaChatRoomService`-Instanz muss während dieser
+ gesmaten Phase aufrecht erhalten werden: Es wird erst gelöst, wenn
+ die Instanz in den normalen Modus zurückwechselt.
+ - D.h. insbesondere auch, dass während dieser ganzen Phase _alle_
+ Anfragen mit `ShardNotOwnedException` abgelehnt werden!
+ - Eine besondere Herausforderung sind *erneute* Rebalances, die
+ Auftreten, *während* der `KafkaChatRoomService` sich noch in einem
+ durch einen vorherigen Rebalance ausgelösten Lade-Modus befindet!
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
private int chatroomBufferSize = 8;
private ServiceType services = ServiceType.inmemory;
private InMemoryServicesProperties inmemory = new InMemoryServicesProperties();
+ private KafkaServicesProperties kafka = new KafkaServicesProperties();
@Getter
private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString();
}
- public enum ServiceType { inmemory }
+ @Getter
+ @Setter
+ public static class KafkaServicesProperties
+ {
+ private String clientIdPrefix;
+ private String bootstrapServers = ":9092";
+ private String chatroomChannelTopic = "chatroom_channel";
+ private String messageChannelTopic = "message_channel";
+ private int numPartitions = 2;
+ }
+
+ public enum ServiceType { inmemory, kafka }
public enum StorageStrategyType { files, mongodb }
public enum ShardingStrategyType { none, kafkalike }
}
}
+ public ChatRoomService getChatRoomService()
+ {
+ return service;
+ }
+
public Mono<Message> getMessage(String username, Long messageId)
{
Message.MessageKey key = Message.MessageKey.of(username, messageId);
public interface ChatRoomFactory
{
- Mono<ChatRoom> createChatRoom(UUID id, String name);
+ Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
}
@Override
- public Mono<ChatRoom> createChatRoom(UUID id, String name)
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
log.info("Creating ChatRoom with buffer-size {}", bufferSize);
int shard = shardingStrategy.selectShard(id);
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-@Slf4j
-class ChatHomeLoader
-{
- private final Producer<String, MessageTo> producer;
- private final long offsetOfFirstUnseenMessage;
- private final ZoneId zoneId;
- @Getter
- private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
-
-
- /**
- * Rebuilds the state of the {@link KafkaChatHomeService} by consuming
- * all messages, that belong to the partition, that defines the shard this
- * service represents.
- * The loader signals, that its work is done, if the given end offset is reached.
- * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition.
- * @return {@code true}, if all messages are consumed.
- */
- boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
- {
- TopicPartition topicPartition =
- new TopicPartition(record.topic(), record.partition());
- Message.MessageKey messageKey = Message.MessageKey.of(
- record.value().getUser(),
- record.value().getId());
-
- if (record.offset() >= offsetOfFirstUnseenMessage)
- {
- // All messages consumed: DONE!
- log.trace(
- "Ignoring unseen message {} on {}, offset={}",
- messageKey,
- topicPartition,
- record.offset());
- return true;
- }
-
- Instant timestamp = Instant.ofEpochMilli(record.timestamp());
- LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId);
-
- KafkaChatRoomService service = kafkaChatRoomServiceMap
- .computeIfAbsent(record.key(), key ->
- new KafkaChatRoomService(producer, topicPartition));
-
- service.addMessage(new Message(
- messageKey,
- record.offset(),
- time,
- record.value().getText()));
-
- return false;
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.*;
+import java.util.*;
+import java.util.stream.IntStream;
+
+
+@Slf4j
+public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
+{
+ private final String topic;
+ private final Producer<String, MessageTo> producer;
+ private final Consumer<String, MessageTo> consumer;
+ private final ZoneId zoneId;
+ private final int numShards;
+ private final boolean[] isShardOwned;
+ private final long[] currentOffset;
+ private final long[] nextOffset;
+ private final Map<UUID, ChatRoom>[] chatrooms;
+ private final KafkaLikeShardingStrategy shardingStrategy;
+
+ private boolean running;
+ @Getter
+ private volatile boolean loadInProgress;
+
+
+ public ChatMessageChannel(
+ String topic,
+ Producer<String, MessageTo> producer,
+ Consumer<String, MessageTo> consumer,
+ ZoneId zoneId,
+ int numShards)
+ {
+ log.debug(
+ "Creating ChatMessageChannel for topic {} with {} partitions",
+ topic,
+ numShards);
+ this.topic = topic;
+ this.consumer = consumer;
+ this.producer = producer;
+ this.zoneId = zoneId;
+ this.numShards = numShards;
+ this.isShardOwned = new boolean[numShards];
+ this.currentOffset = new long[numShards];
+ this.nextOffset = new long[numShards];
+ this.chatrooms = new Map[numShards];
+ IntStream
+ .range(0, numShards)
+ .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
+ this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
+ }
+
+
+ Mono<Message> sendMessage(
+ UUID chatRoomId,
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text)
+ {
+ int shard = this.shardingStrategy.selectShard(chatRoomId);
+ TopicPartition tp = new TopicPartition(topic, shard);
+ ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
+ return Mono.create(sink ->
+ {
+ ProducerRecord<String, MessageTo> record =
+ new ProducerRecord<>(
+ tp.topic(),
+ tp.partition(),
+ zdt.toEpochSecond(),
+ chatRoomId.toString(),
+ MessageTo.of(key.getUsername(), key.getMessageId(), text));
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ // On successful send
+ Message message = new Message(key, metadata.offset(), timestamp, text);
+ log.info("Successfully send message {}", message);
+ sink.success(message);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
+ chatRoomId,
+ key,
+ timestamp,
+ text,
+ exception);
+ sink.error(exception);
+ }
+ }));
+ });
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ log.info("Newly assigned partitions! Pausing normal operations...");
+ loadInProgress = true;
+
+ consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
+ {
+ int partition = topicPartition.partition();
+ isShardOwned[partition] = true;
+ this.currentOffset[partition] = currentOffset;
+
+ log.info(
+ "Partition assigned: {} - loading messages: next={} -> current={}",
+ partition,
+ nextOffset[partition],
+ currentOffset);
+
+ consumer.seek(topicPartition, nextOffset[partition]);
+ });
+
+ consumer.resume(partitions);
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(topicPartition ->
+ {
+ int partition = topicPartition.partition();
+ isShardOwned[partition] = false;
+ log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+ });
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ log.warn("Lost partitions: {}, partitions");
+ // TODO: Muss auf den Verlust anders reagiert werden?
+ onPartitionsRevoked(partitions);
+ }
+
+ @Override
+ public void run()
+ {
+ consumer.subscribe(List.of(topic), this);
+
+ running = true;
+
+ while (running)
+ {
+ try
+ {
+ ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
+ log.info("Fetched {} messages", records.count());
+
+ if (loadInProgress)
+ {
+ loadMessages(records);
+
+ if (isLoadingCompleted())
+ {
+ log.info("Loading of messages completed! Pausing all owned partitions...");
+ pauseAllOwnedPartions();
+ log.info("Resuming normal operations...");
+ loadInProgress = false;
+ }
+ }
+ else
+ {
+ if (!records.isEmpty())
+ {
+ throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
+ }
+ }
+ }
+ catch (WakeupException e)
+ {
+ log.info("Received WakeupException, exiting!");
+ running = false;
+ }
+ }
+
+ log.info("Exiting normally");
+ }
+
+ void loadMessages(ConsumerRecords<String, MessageTo> records)
+ {
+ for (ConsumerRecord<String, MessageTo> record : records)
+ {
+ nextOffset[record.partition()] = record.offset() + 1;
+ UUID chatRoomId = UUID.fromString(record.key());
+ MessageTo messageTo = record.value();
+
+ Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
+
+ Instant instant = Instant.ofEpochSecond(record.timestamp());
+ LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+
+ Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
+
+ ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
+ if (chatRoom == null)
+ {
+ // Alles pausieren und erst von putChatRoom wieder resumen lassen!
+ }
+ KafkaChatRoomService kafkaChatRoomService =
+ (KafkaChatRoomService) chatRoom.getChatRoomService();
+
+ kafkaChatRoomService.persistMessage(message);
+ }
+ }
+
+ boolean isLoadingCompleted()
+ {
+ return IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
+ }
+
+ void pauseAllOwnedPartions()
+ {
+ consumer.pause(IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .mapToObj(shard -> new TopicPartition(topic, shard))
+ .toList());
+ }
+
+
+ void putChatRoom(ChatRoom chatRoom)
+ {
+ Integer partition = chatRoom.getShard();
+ UUID chatRoomId = chatRoom.getId();
+ if (chatrooms[partition].containsKey(chatRoomId))
+ {
+ log.warn("Ignoring existing chat-room: " + chatRoom);
+ }
+ else
+ {
+ log.info(
+ "Adding new chat-room to partition {}: {}",
+ partition,
+ chatRoom);
+
+ chatrooms[partition].put(chatRoomId, chatRoom);
+ }
+ }
+
+ Mono<ChatRoom> getChatRoom(int shard, UUID id)
+ {
+ return Mono.justOrEmpty(chatrooms[shard].get(id));
+ }
+
+ Flux<ChatRoom> getChatRooms()
+ {
+ return Flux.fromStream(IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .mapToObj(shard -> Integer.valueOf(shard))
+ .flatMap(shard -> chatrooms[shard].values().stream()));
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.util.UUID;
-
-
-/**
- * TODO:
- * Actual the only active strategy!
- * {@link MessageHandlingStrategy} probably not needed: Refactor!
- */
-@RequiredArgsConstructor
-@Slf4j
-class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
-{
- private final KafkaChatRoomService kafkaChatRoomService;
- private final Producer<String, MessageTo> producer;
- private final TopicPartition tp;
- private final UUID chatRoomId;
- private final ZoneOffset zoneOffset;
-
-
- @Override
- public Mono<Message> handleMessage(
- Message.MessageKey key,
- LocalDateTime timestamp,
- String text)
- {
- return Mono.create(sink ->
- {
- ProducerRecord<String, MessageTo> record =
- new ProducerRecord<>(
- tp.topic(),
- tp.partition(),
- timestamp.toEpochSecond(zoneOffset),
- chatRoomId.toString(),
- MessageTo.of(key.getUsername(), key.getMessageId(), text));
-
- producer.send(record, ((metadata, exception) ->
- {
- if (metadata != null)
- {
- // On successful send
- {
- // Emit new message
- Message message = new Message(key, metadata.offset(), timestamp, text);
- kafkaChatRoomService.addMessage(message);
- }
-
- sink.success();
- }
- else
- {
- // On send-failure
- sink.error(exception);
- }
- }));
- });
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.*;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import reactor.core.publisher.Mono;
+
+import java.time.*;
+import java.util.List;
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ChatRoomChannel implements Runnable
+{
+ private final String topic;
+ private final Producer<Integer, ChatRoomTo> producer;
+ private final Consumer<Integer, ChatRoomTo> consumer;
+ private final ShardingStrategy shardingStrategy;
+ private final ChatMessageChannel chatMessageChannel;
+ private final Clock clock;
+ private final int bufferSize;
+
+ private boolean running;
+
+
+ Mono<ChatRoomInfo> sendCreateChatRoomRequest(
+ UUID chatRoomId,
+ String name)
+ {
+ int shard = this.shardingStrategy.selectShard(chatRoomId);
+ ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId.toString(), name, shard);
+ return Mono.create(sink ->
+ {
+ ProducerRecord<Integer, ChatRoomTo> record =
+ new ProducerRecord<>(
+ topic,
+ shard,
+ chatRoomTo);
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ log.info("Successfully send chreate-request for chat room: {}", chatRoomTo);
+ sink.success(chatRoomTo.toChatRoomInfo());
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send create-request for chat room (id={}, name={}): {}",
+ chatRoomId,
+ name,
+ exception);
+ sink.error(exception);
+ }
+ }));
+ });
+ }
+
+ @Override
+ public void run()
+ {
+ consumer.assign(List.of(new TopicPartition(topic, 0)));
+
+ running = true;
+
+ while (running)
+ {
+ try
+ {
+ ConsumerRecords<Integer, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
+ log.info("Fetched {} messages", records.count());
+
+ for (ConsumerRecord<Integer, ChatRoomTo> record : records)
+ {
+ createChatRoom(record.value().toChatRoomInfo());
+ }
+ }
+ catch (WakeupException e)
+ {
+ log.info("Received WakeupException, exiting!");
+ running = false;
+ }
+ }
+
+ log.info("Exiting normally");
+ }
+
+
+ void createChatRoom(ChatRoomInfo chatRoomInfo)
+ {
+ UUID id = chatRoomInfo.getId();
+ String name = chatRoomInfo.getName();
+ int shard = chatRoomInfo.getShard();
+ log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
+ KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
+ ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+ chatMessageChannel.putChatRoom(chatRoom);
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.TopicPartition;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-
-
-/**
- * TODO:
- * Not used anywhere
- * {@link ChatRoomActiveMessageHandlingStrategy} is the only active strategy at the moment.
- * Refactor?
- */
-class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
-{
- private final Consumer consumer;
- private final TopicPartition tp;
- private final long currentOffset;
- private final long unseenOffset;
-
- ChatRoomLoadingMessageHandlingStrategy(
- Consumer consumer,
- TopicPartition tp,
- long currentOffset,
- long unseenOffset)
- {
- this.consumer = consumer;
- this.tp = tp;
- this.currentOffset = currentOffset;
- this.unseenOffset = unseenOffset;
-
- consumer.seek(tp, unseenOffset);
- }
-
- @Override
- public Mono<Message> handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text)
- {
- // TODO
- return null;
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.UUID;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class ChatRoomTo
+{
+ private String id;
+ private String name;
+ private int shard;
+
+ public ChatRoomInfo toChatRoomInfo()
+ {
+ return new ChatRoomInfo(UUID.fromString(id), name, shard);
+ }
+
+ public static ChatRoomTo from(ChatRoom chatRoom)
+ {
+ return ChatRoomTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.TopicPartition;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-
-
-@RequiredArgsConstructor
-class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy
-{
- private final int shard;
-
- @Override
- public Mono<Message> handleMessage(
- Message.MessageKey key,
- LocalDateTime timestamp,
- String text)
- {
- throw new ShardNotOwnedException(shard);
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.*;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaChatHome implements ChatHome
+{
+ private final ShardingStrategy shardingStrategy;
+ private final ChatMessageChannel chatMessageChanel;
+
+
+ @Override
+ public Mono<ChatRoom> getChatRoom(UUID id)
+ {
+ int shard = shardingStrategy.selectShard(id);
+ if (chatMessageChanel.isLoadInProgress())
+ {
+ throw new LoadInProgressException(shard);
+ }
+ else
+ {
+ return chatMessageChanel.getChatRoom(shard, id);
+ }
+ }
+
+ @Override
+ public Flux<ChatRoom> getChatRooms()
+ {
+ if (chatMessageChanel.isLoadInProgress())
+ {
+ throw new LoadInProgressException();
+ }
+ else
+ {
+ return chatMessageChanel.getChatRooms();
+ }
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.TopicPartition;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.ZoneId;
-import java.util.*;
-
-
-@Slf4j
-public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
-{
- private final Consumer<String, MessageTo> consumer;
- private final Producer<String, MessageTo> producer;
- private final String topic;
- private final ZoneId zoneId;
- // private final long[] offsets; Erst mal immer alles neu einlesen
- private final ChatHomeLoader[] chatHomeLoaders;
- private final Map<UUID, ChatRoom>[] chatRoomMaps;
-
-
- public KafkaChatHomeService(
- Consumer<String, MessageTo> consumer,
- Producer<String, MessageTo> producer,
- String topic,
- ZoneId zoneId,
- int numShards)
- {
- log.debug("Creating KafkaChatHomeService");
- this.consumer = consumer;
- this.producer = producer;
- this.topic = topic;
- this.zoneId = zoneId;
- // this.offsets = new long[numShards];
- // for (int i=0; i< numShards; i++)
- // {
- // this.offsets[i] = 0l;
- // }
- this.chatHomeLoaders = new ChatHomeLoader[numShards];
- this.chatRoomMaps = new Map[numShards];
- }
-
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
- {
- if (!topicPartition.topic().equals(topic))
- {
- log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
- return;
- }
-
- int partition = topicPartition.partition();
- long unseenOffset = 0; // offsets[partition];
-
- log.info(
- "Loading messages from partition {}: start-offset={} -> current-offset={}",
- partition,
- unseenOffset,
- currentOffset);
-
- // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
- consumer.seek(topicPartition, unseenOffset);
- chatHomeLoaders[partition] = new ChatHomeLoader(
- producer,
- currentOffset,
- zoneId);
- });
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- partitions.forEach(topicPartition ->
- {
- if (!topicPartition.topic().equals(topic))
- {
- log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
- return;
- }
-
- int partition = topicPartition.partition();
- // long unseenOffset = offsets[partition]; TODO: Offset merken...?
- });
- log.info("Revoked partitions: {}", partitions);
- }
-
- @Override
- public void onPartitionsLost(Collection<TopicPartition> partitions)
- {
- // TODO: Muss auf den Verlust anders reagiert werden?
- onPartitionsRevoked(partitions);
- }
-
- @Override
- public Mono<ChatRoom> getChatRoom(int shard, UUID id)
- {
- return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
- }
-
- @Override
- public Flux<ChatRoom> getChatRooms(int shard)
- {
- return Flux.fromStream(chatRoomMaps[shard].values().stream());
- }
-}
package de.juplo.kafka.chat.backend.persistence.kafka;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import java.util.UUID;
+@RequiredArgsConstructor
+@Slf4j
public class KafkaChatRoomFactory implements ChatRoomFactory
{
+ private final ChatRoomChannel chatRoomChannel;
+
@Override
- public Mono<ChatRoom> createChatRoom(UUID id, String name)
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
- return null;
+ log.info("Sending create-request for chat rooom: id={}, name={}");
+ return chatRoomChannel.sendCreateChatRoomRequest(id, name);
}
}
package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
+import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
import java.util.LinkedHashMap;
+import java.util.UUID;
+@RequiredArgsConstructor
@Slf4j
public class KafkaChatRoomService implements ChatRoomService
{
- private final Producer<String, MessageTo> producer;
- private final TopicPartition tp;
+ private final ChatMessageChannel chatMessageChannel;
+ private final UUID chatRoomId;
private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
- private volatile MessageHandlingStrategy strategy;
-
-
- public KafkaChatRoomService(
- Producer<String, MessageTo> producer,
- TopicPartition tp)
- {
- this.producer = producer;
- this.tp = tp;
- this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp.partition());
- }
-
@Override
- synchronized public Mono<Message> persistMessage(
+ public Mono<Message> persistMessage(
Message.MessageKey key,
LocalDateTime timestamp,
String text)
{
- return strategy.handleMessage(key, timestamp, text);
+ return chatMessageChannel
+ .sendMessage(chatRoomId, key, timestamp, text)
+ .doOnSuccess(message -> persistMessage(message));
}
- /**
- * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
- * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
- */
- protected void addMessage(Message message) throws MessageMutationException
+ void persistMessage(Message message)
{
- Message existingMessage = messages.get(message.getKey());
-
- // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
- // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
- // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
- // fängt dies bereits der ChatRoom ab.
- // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
- // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
- // doppelt aufschlägt...
- if (existingMessage == null)
- {
- messages.put(message.getKey(), message);
- }
- else
- {
- if (!existingMessage.getMessageText().equals(message.getMessageText()))
- {
- throw new MessageMutationException(existingMessage, message.getMessageText());
- }
-
- // Warn and emit existing message
- log.warn(
- "Keeping existing message with {}@{} for {}",
- existingMessage.getSerialNumber(),
- existingMessage.getTimestamp(),
- existingMessage.getKey());
- }
+ messages.put(message.getKey(), message);
}
@Override
synchronized public Mono<Message> getMessage(Message.MessageKey key)
{
- // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
- // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
return Mono.fromSupplier(() -> messages.get(key));
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import jakarta.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.CompletableFuture;
+
+
+@ConditionalOnProperty(
+ prefix = "chat.backend",
+ name = "services",
+ havingValue = "kafka")
+@Component
+@Slf4j
+public class KafkaServicesApplicationRunner implements ApplicationRunner
+{
+ @Autowired
+ ThreadPoolTaskExecutor taskExecutor;
+ @Autowired
+ ConfigurableApplicationContext context;
+
+ @Autowired
+ ChatRoomChannel chatRoomChannel;
+ @Autowired
+ Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer;
+ @Autowired
+ ChatMessageChannel chatMessageChannel;
+ @Autowired
+ Consumer<String, MessageTo> chatMessageChannelConsumer;
+
+ CompletableFuture<Void> chatRoomChannelConsumerJob;
+ CompletableFuture<Void> chatMessageChannelConsumerJob;
+
+
+ @Override
+ public void run(ApplicationArguments args) throws Exception
+ {
+ log.info("Starting the consumer for the ChatRoomChannel");
+ chatRoomChannelConsumerJob = taskExecutor
+ .submitCompletable(chatRoomChannel)
+ .exceptionally(e ->
+ {
+ log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+ return null;
+ });
+ log.info("Starting the consumer for the ChatMessageChannel");
+ chatMessageChannelConsumerJob = taskExecutor
+ .submitCompletable(chatMessageChannel)
+ .exceptionally(e ->
+ {
+ log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
+ return null;
+ });
+ }
+
+ @PreDestroy
+ public void joinChatRoomChannelConsumerJob()
+ {
+ log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+ chatRoomChannelConsumer.wakeup();
+ log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+ chatRoomChannelConsumerJob.join();
+ log.info("Joined the consumer of the ChatRoomChannel");
+ }
+
+ @PreDestroy
+ public void joinChatMessageChannelConsumerJob()
+ {
+ log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+ chatMessageChannelConsumer.wakeup();
+ log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
+ chatMessageChannelConsumerJob.join();
+ log.info("Joined the consumer of the ChatMessageChannel");
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+
+@ConditionalOnProperty(
+ prefix = "chat.backend",
+ name = "services",
+ havingValue = "kafka")
+@Configuration
+public class KafkaServicesConfiguration
+{
+ @Bean
+ ChatHome kafkaChatHome(
+ ShardingStrategy shardingStrategy,
+ ChatMessageChannel chatMessageChannel)
+ {
+ return new KafkaChatHome(shardingStrategy, chatMessageChannel);
+ }
+
+ @Bean
+ KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
+ {
+ return new KafkaChatRoomFactory(chatRoomChannel);
+ }
+
+ @Bean
+ ChatRoomChannel chatRoomChannel(
+ ChatBackendProperties properties,
+ Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
+ Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
+ ShardingStrategy shardingStrategy,
+ ChatMessageChannel chatMessageChannel,
+ Clock clock)
+ {
+ return new ChatRoomChannel(
+ properties.getKafka().getChatroomChannelTopic(),
+ chatRoomChannelProducer,
+ chatRoomChannelConsumer,
+ shardingStrategy,
+ chatMessageChannel,
+ clock,
+ properties.getChatroomBufferSize());
+ }
+
+ @Bean
+ Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
+ Properties defaultProducerProperties,
+ ChatBackendProperties chatBackendProperties,
+ IntegerSerializer integerSerializer,
+ JsonSerializer<ChatRoomTo> chatRoomSerializer)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ProducerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
+ return new KafkaProducer<>(
+ properties,
+ integerSerializer,
+ chatRoomSerializer);
+ }
+
+ @Bean
+ IntegerSerializer integerSerializer()
+ {
+ return new IntegerSerializer();
+ }
+
+ @Bean
+ JsonSerializer<ChatRoomTo> chatRoomSerializer()
+ {
+ JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+ serializer.configure(
+ Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+ false);
+ return serializer;
+ }
+
+ @Bean
+ Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
+ Properties defaultConsumerProperties,
+ ChatBackendProperties chatBackendProperties,
+ IntegerDeserializer integerDeserializer,
+ JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
+ properties.put(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "chat_room_channel");
+ return new KafkaConsumer<>(
+ properties,
+ integerDeserializer,
+ chatRoomDeserializer);
+ }
+
+ @Bean
+ IntegerDeserializer integerDeserializer()
+ {
+ return new IntegerDeserializer();
+ }
+
+ @Bean
+ JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+ {
+ JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+ deserializer.configure(
+ Map.of(
+ JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
+ JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
+ JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
+ false );
+ return deserializer;
+ }
+
+ @Bean
+ ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+ {
+ return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+ }
+
+ @Bean
+ ChatMessageChannel chatMessageChannel(
+ ChatBackendProperties properties,
+ Producer<String, MessageTo> chatMessageChannelProducer,
+ Consumer<String, MessageTo> chatMessageChannelConsumer,
+ ZoneId zoneId)
+ {
+ return new ChatMessageChannel(
+ properties.getKafka().getMessageChannelTopic(),
+ chatMessageChannelProducer,
+ chatMessageChannelConsumer,
+ zoneId,
+ properties.getKafka().getNumPartitions());
+ }
+
+ @Bean
+ Producer<String, MessageTo> chatMessageChannelProducer(
+ Properties defaultProducerProperties,
+ ChatBackendProperties chatBackendProperties,
+ StringSerializer stringSerializer,
+ JsonSerializer<MessageTo> messageSerializer)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ProducerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER");
+ return new KafkaProducer<>(
+ properties,
+ stringSerializer,
+ messageSerializer);
+ }
+
+ @Bean
+ StringSerializer stringSerializer()
+ {
+ return new StringSerializer();
+ }
+
+ @Bean
+ JsonSerializer<MessageTo> chatMessageSerializer()
+ {
+ JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+ serializer.configure(
+ Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+ false);
+ return serializer;
+ }
+
+ @Bean
+ Consumer<String, MessageTo> chatMessageChannelConsumer(
+ Properties defaultConsumerProperties,
+ ChatBackendProperties chatBackendProperties,
+ StringDeserializer stringDeserializer,
+ JsonDeserializer<MessageTo> messageDeserializer)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
+ properties.put(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "chat_message_channel");
+ return new KafkaConsumer<>(
+ properties,
+ stringDeserializer,
+ messageDeserializer);
+ }
+
+ @Bean
+ StringDeserializer stringDeserializer()
+ {
+ return new StringDeserializer();
+ }
+
+ @Bean
+ JsonDeserializer<MessageTo> chatMessageDeserializer()
+ {
+ JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+ deserializer.configure(
+ Map.of(
+ JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
+ JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
+ JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
+ false );
+ return deserializer;
+ }
+
+ @Bean
+ Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ Properties properties = new Properties();
+ properties.setProperty(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ chatBackendProperties.getKafka().getBootstrapServers());
+ return properties;
+ }
+
+ @Bean
+ Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ Properties properties = new Properties();
+ properties.setProperty(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ chatBackendProperties.getKafka().getBootstrapServers());
+ properties.setProperty(
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ "false");
+ properties.setProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ "earliest");
+ return properties;
+ }
+
+ @Bean
+ ZoneId zoneId()
+ {
+ return ZoneId.systemDefault();
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+
+
+public class LoadInProgressException extends ShardNotOwnedException
+{
+ public LoadInProgressException()
+ {
+ this(-1);
+ }
+
+ public LoadInProgressException(int shard)
+ {
+ super(shard);
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-
-
-interface MessageHandlingStrategy
-{
- Mono<Message> handleMessage(
- Message.MessageKey key,
- LocalDateTime timestamp,
- String text);
-}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import java.util.UUID;
+
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.CHATROOMS_TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.MESSAGES_TOPIC;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "chat.backend.services=kafka",
+ "chat.backend.kafka.client-id-PREFIX=TEST",
+ "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "chat.backend.kafka.chatroom-channel-topic=" + CHATROOMS_TOPIC,
+ "chat.backend.kafka.message-channel-topic=" + MESSAGES_TOPIC,
+ "chat.backend.kafka.num-partitions=3" })
+@EmbeddedKafka(topics = { CHATROOMS_TOPIC, MESSAGES_TOPIC }, partitions = 3)
+class KafkaConfigurationIT extends AbstractConfigurationIT
+{
+ final static String CHATROOMS_TOPIC = "TEST_CHATROOM_CHANNEL";
+ final static String MESSAGES_TOPIC = "TEST_MESSAGE_CHANNEL";
+
+ @BeforeAll
+ public static void test(
+ @Autowired ShardingStrategy shardingStrategy,
+ @Autowired KafkaTemplate<String, String> messageTemplate,
+ @Autowired KafkaTemplate<Integer, String> chatRoomTemplate)
+ {
+ UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+ int shard = shardingStrategy.selectShard(chatRoomId);
+ chatRoomTemplate.send(CHATROOMS_TOPIC, null,"{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }");
+ messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }");
+ messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }");
+ messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }");
+ messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }");
+ }
+}