{
Mono<ChatRoom> getChatRoom(UUID id);
- Flux<ChatRoom> getChatRooms();
+ Flux<ChatRoomInfo> getChatRooms();
}
package de.juplo.kafka.chat.backend.persistence;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import reactor.core.publisher.Flux;
public interface StorageStrategy
{
- void write(Flux<ChatRoom> chatroomFlux);
+ void write(Flux<ChatRoomInfo> chatroomFlux);
Flux<ChatRoom> read();
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+public class AbstractTo
+{
+ public enum ToType {
+ CREATE_CHATROOM_REQUEST,
+ MESSAGE_SENT,
+ CHATROOM_INFO
+ }
+
+ @Getter
+ private final ToType type;
+}
package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
{
private final String topic;
- private final Producer<String, MessageTo> producer;
- private final Consumer<String, MessageTo> consumer;
+ private final Producer<String, AbstractTo> producer;
+ private final Consumer<String, AbstractTo> consumer;
private final ZoneId zoneId;
private final int numShards;
+ private final int bufferSize;
+ private final Clock clock;
private final boolean[] isShardOwned;
private final long[] currentOffset;
private final long[] nextOffset;
private final Map<UUID, ChatRoom>[] chatrooms;
- private final KafkaLikeShardingStrategy shardingStrategy;
private boolean running;
@Getter
public ChatMessageChannel(
String topic,
- Producer<String, MessageTo> producer,
- Consumer<String, MessageTo> consumer,
+ Producer<String, AbstractTo> producer,
+ Consumer<String, AbstractTo> consumer,
ZoneId zoneId,
- int numShards)
+ int numShards,
+ int bufferSize,
+ Clock clock)
{
log.debug(
"Creating ChatMessageChannel for topic {} with {} partitions",
this.producer = producer;
this.zoneId = zoneId;
this.numShards = numShards;
+ this.bufferSize = bufferSize;
+ this.clock = clock;
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
IntStream
.range(0, numShards)
.forEach(shard -> this.chatrooms[shard] = new HashMap<>());
- this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
}
- Mono<Message> sendMessage(
+
+ Mono<ChatRoomInfo> sendCreateChatRoomRequest(
+ UUID chatRoomId,
+ String name)
+ {
+ CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(name);
+ return Mono.create(sink ->
+ {
+ ProducerRecord<String, CreateChatRoomRequestTo> record =
+ new ProducerRecord<>(
+ topic,
+ chatRoomId.toString(),
+ createChatRoomRequestTo);
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+ ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition());
+ createChatRoom(chatRoomInfo);
+ sink.success(chatRoomInfo);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send create-request for chat room (id={}, name={}): {}",
+ chatRoomId,
+ name,
+ exception);
+ sink.error(exception);
+ }
+ }));
+ });
+ }
+
+ Mono<Message> sendChatMessage(
UUID chatRoomId,
Message.MessageKey key,
LocalDateTime timestamp,
String text)
{
- int shard = this.shardingStrategy.selectShard(chatRoomId);
- TopicPartition tp = new TopicPartition(topic, shard);
ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
return Mono.create(sink ->
{
- ProducerRecord<String, MessageTo> record =
+ ProducerRecord<String, AbstractTo> record =
new ProducerRecord<>(
- tp.topic(),
- tp.partition(),
+ topic,
+ null,
zdt.toEpochSecond(),
chatRoomId.toString(),
- MessageTo.of(key.getUsername(), key.getMessageId(), text));
+ ChatMessageTo.of(key.getUsername(), key.getMessageId(), text));
producer.send(record, ((metadata, exception) ->
{
{
try
{
- ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<String, AbstractTo> records = consumer.poll(Duration.ofMinutes(5));
log.info("Fetched {} messages", records.count());
if (loadInProgress)
{
- loadMessages(records);
+ loadChatRoom(records);
if (isLoadingCompleted())
{
log.info("Exiting normally");
}
- void loadMessages(ConsumerRecords<String, MessageTo> records)
+ void loadChatRoom(ConsumerRecords<String, AbstractTo> records)
{
- for (ConsumerRecord<String, MessageTo> record : records)
+ for (ConsumerRecord<String, AbstractTo> record : records)
{
- nextOffset[record.partition()] = record.offset() + 1;
UUID chatRoomId = UUID.fromString(record.key());
- MessageTo messageTo = record.value();
-
- Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
- Instant instant = Instant.ofEpochSecond(record.timestamp());
- LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
-
- Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
-
- ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
- if (chatRoom == null)
+ switch (record.value().getType())
{
- // Alles pausieren und erst von putChatRoom wieder resumen lassen!
+ case CREATE_CHATROOM_REQUEST:
+ createChatRoom(
+ chatRoomId,
+ (CreateChatRoomRequestTo) record.value(),
+ record.partition());
+ break;
+
+ case MESSAGE_SENT:
+ Instant instant = Instant.ofEpochSecond(record.timestamp());
+ LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+ loadChatMessage(
+ chatRoomId,
+ timestamp,
+ record.offset(),
+ (ChatMessageTo) record.value(),
+ record.partition());
+ break;
+
+ default:
+ log.debug(
+ "Ignoring message for chat-room {} with offset {}: {}",
+ chatRoomId,
+ record.offset(),
+ record.value());
}
- KafkaChatRoomService kafkaChatRoomService =
- (KafkaChatRoomService) chatRoom.getChatRoomService();
- kafkaChatRoomService.persistMessage(message);
+ nextOffset[record.partition()] = record.offset() + 1;
}
}
+ void createChatRoom(
+ UUID chatRoomId,
+ CreateChatRoomRequestTo createChatRoomRequestTo,
+ int partition)
+ {
+ putChatRoom(ChatRoomInfo.of(
+ chatRoomId,
+ createChatRoomRequestTo.getName(),
+ partition));
+ }
+
+
+ void createChatRoom(ChatRoomInfo chatRoomInfo)
+ {
+ UUID id = chatRoomInfo.getId();
+ String name = chatRoomInfo.getName();
+ int shard = chatRoomInfo.getShard();
+ log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
+ KafkaChatRoomService service = new KafkaChatRoomService(this, id);
+ ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+ putChatRoom(chatRoom);
+ }
+
+ void loadChatMessage(
+ UUID chatRoomId,
+ LocalDateTime timestamp,
+ long offset,
+ ChatMessageTo chatMessageTo,
+ int partition)
+ {
+ Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
+ Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
+
+ ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
+ KafkaChatRoomService kafkaChatRoomService =
+ (KafkaChatRoomService) chatRoom.getChatRoomService();
+
+ kafkaChatRoomService.persistMessage(message);
+ }
+
boolean isLoadingCompleted()
{
return IntStream
}
- void putChatRoom(ChatRoom chatRoom)
+ private void putChatRoom(ChatRoom chatRoom)
{
Integer partition = chatRoom.getShard();
UUID chatRoomId = chatRoom.getId();
{
return Mono.justOrEmpty(chatrooms[shard].get(id));
}
-
- Flux<ChatRoom> getChatRooms()
- {
- return Flux.fromStream(IntStream
- .range(0, numShards)
- .filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> Integer.valueOf(shard))
- .flatMap(shard -> chatrooms[shard].values().stream()));
- }
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+
+@Data
+public class ChatMessageTo extends AbstractTo
+{
+ private String user;
+ private Long id;
+ private String text;
+
+
+ public ChatMessageTo()
+ {
+ super(ToType.MESSAGE_SENT);
+ }
+
+
+ public Message toMessage(long offset, LocalDateTime timestamp)
+ {
+ return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
+ }
+
+ public static ChatMessageTo from(Message message)
+ {
+ return ChatMessageTo.of(
+ message.getUsername(),
+ message.getId(),
+ message.getMessageText());
+ }
+
+
+ public static ChatMessageTo of(String user, Long id, String text)
+ {
+ ChatMessageTo to = new ChatMessageTo();
+ to.user = user;
+ to.id = id;
+ to.text = text;
+ return to;
+ }
+}
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.*;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.stream.IntStream;
@RequiredArgsConstructor
public class ChatRoomChannel implements Runnable
{
private final String topic;
- private final Producer<Integer, ChatRoomTo> producer;
- private final Consumer<Integer, ChatRoomTo> consumer;
- private final ShardingStrategy shardingStrategy;
- private final ChatMessageChannel chatMessageChannel;
- private final Clock clock;
- private final int bufferSize;
+ private final Consumer<String, AbstractTo> consumer;
+ private final Map<UUID, ChatRoomInfo> chatrooms = new HashMap<>();
private boolean running;
- Mono<ChatRoomInfo> sendCreateChatRoomRequest(
- UUID chatRoomId,
- String name)
- {
- int shard = this.shardingStrategy.selectShard(chatRoomId);
- ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId.toString(), name, shard);
- return Mono.create(sink ->
- {
- ProducerRecord<Integer, ChatRoomTo> record =
- new ProducerRecord<>(
- topic,
- shard,
- chatRoomTo);
-
- producer.send(record, ((metadata, exception) ->
- {
- if (metadata != null)
- {
- log.info("Successfully send chreate-request for chat room: {}", chatRoomTo);
- sink.success(chatRoomTo.toChatRoomInfo());
- }
- else
- {
- // On send-failure
- log.error(
- "Could not send create-request for chat room (id={}, name={}): {}",
- chatRoomId,
- name,
- exception);
- sink.error(exception);
- }
- }));
- });
- }
-
@Override
public void run()
{
{
try
{
- ConsumerRecords<Integer, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<String, AbstractTo> records = consumer.poll(Duration.ofMinutes(5));
log.info("Fetched {} messages", records.count());
- for (ConsumerRecord<Integer, ChatRoomTo> record : records)
+ for (ConsumerRecord<String, AbstractTo> record : records)
{
- createChatRoom(record.value().toChatRoomInfo());
+ switch (record.value().getType())
+ {
+ case CHATROOM_INFO:
+ createChatRoom((ChatRoomInfoTo) record.value());
+ break;
+
+ default:
+ log.debug(
+ "Ignoring message for key {} with offset {}: {}",
+ record.key(),
+ record.offset(),
+ record.value());
+ }
}
}
catch (WakeupException e)
}
- void createChatRoom(ChatRoomInfo chatRoomInfo)
+ void createChatRoom(ChatRoomInfoTo chatRoomInfoTo)
+ {
+ ChatRoomInfo chatRoomInfo = chatRoomInfoTo.toChatRoomInfo();
+ chatrooms.put(chatRoomInfo.getId(), chatRoomInfo);
+ }
+
+ Flux<ChatRoomInfo> getChatRooms()
{
- UUID id = chatRoomInfo.getId();
- String name = chatRoomInfo.getName();
- int shard = chatRoomInfo.getShard();
- log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
- KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
- ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
- chatMessageChannel.putChatRoom(chatRoom);
+ return Flux.fromIterable(chatrooms.values());
}
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.Data;
+
+import java.util.UUID;
+
+
+@Data
+public class ChatRoomInfoTo extends AbstractTo
+{
+ private String id;
+ private String name;
+ private int shard;
+
+
+ public ChatRoomInfoTo()
+ {
+ super(ToType.CHATROOM_INFO);
+ }
+
+
+ public ChatRoomInfo toChatRoomInfo()
+ {
+ return new ChatRoomInfo(UUID.fromString(id), name, shard);
+ }
+
+ public static ChatRoomInfoTo from(ChatRoom chatRoom)
+ {
+ return ChatRoomInfoTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
+ }
+
+ public static ChatRoomInfoTo of(String id, String name, int shard)
+ {
+ ChatRoomInfoTo to = new ChatRoomInfoTo();
+ to.id = id;
+ to.name = name;
+ to.shard = shard;
+ return to;
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import java.util.UUID;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class ChatRoomTo
-{
- private String id;
- private String name;
- private int shard;
-
- public ChatRoomInfo toChatRoomInfo()
- {
- return new ChatRoomInfo(UUID.fromString(id), name, shard);
- }
-
- public static ChatRoomTo from(ChatRoom chatRoom)
- {
- return ChatRoomTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.Data;
+
+import java.util.UUID;
+
+
+@Data
+public class CreateChatRoomRequestTo extends AbstractTo
+{
+ private String name;
+
+
+ public CreateChatRoomRequestTo()
+ {
+ super(ToType.CREATE_CHATROOM_REQUEST);
+ }
+
+
+ public static CreateChatRoomRequestTo of(String name)
+ {
+ CreateChatRoomRequestTo to = new CreateChatRoomRequestTo();
+ to.name = name;
+ return to;
+ }
+}
import de.juplo.kafka.chat.backend.domain.ChatHome;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
public class KafkaChatHome implements ChatHome
{
private final ShardingStrategy shardingStrategy;
+ private final ChatRoomChannel chatRoomChannel;
private final ChatMessageChannel chatMessageChanel;
}
@Override
- public Flux<ChatRoom> getChatRooms()
+ public Flux<ChatRoomInfo> getChatRooms()
{
- if (chatMessageChanel.isLoadInProgress())
- {
- throw new LoadInProgressException();
- }
- else
- {
- return chatMessageChanel.getChatRooms();
- }
+ return chatRoomChannel.getChatRooms();
}
}
String text)
{
return chatMessageChannel
- .sendMessage(chatRoomId, key, timestamp, text)
+ .sendChatMessage(chatRoomId, key, timestamp, text)
.doOnSuccess(message -> persistMessage(message));
}
@Autowired
ChatRoomChannel chatRoomChannel;
@Autowired
- Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer;
+ Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer;
@Autowired
ChatMessageChannel chatMessageChannel;
@Autowired
- Consumer<String, MessageTo> chatMessageChannelConsumer;
+ Consumer<String, ChatMessageTo> chatMessageChannelConsumer;
CompletableFuture<Void> chatRoomChannelConsumerJob;
CompletableFuture<Void> chatMessageChannelConsumerJob;
@Bean
ChatRoomChannel chatRoomChannel(
ChatBackendProperties properties,
- Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
- Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
+ Producer<Integer, CreateChatRoomRequestTo> chatRoomChannelProducer,
+ Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer,
ShardingStrategy shardingStrategy,
ChatMessageChannel chatMessageChannel,
Clock clock)
}
@Bean
- Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
+ Producer<Integer, CreateChatRoomRequestTo> chatRoomChannelProducer(
Properties defaultProducerProperties,
ChatBackendProperties chatBackendProperties,
IntegerSerializer integerSerializer,
- JsonSerializer<ChatRoomTo> chatRoomSerializer)
+ JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer)
{
Map<String, Object> properties = new HashMap<>();
defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
}
@Bean
- JsonSerializer<ChatRoomTo> chatRoomSerializer()
+ JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer()
{
- JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+ JsonSerializer<CreateChatRoomRequestTo> serializer = new JsonSerializer<>();
serializer.configure(
Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
false);
}
@Bean
- Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
+ Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer(
Properties defaultConsumerProperties,
ChatBackendProperties chatBackendProperties,
IntegerDeserializer integerDeserializer,
- JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+ JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer)
{
Map<String, Object> properties = new HashMap<>();
defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
}
@Bean
- JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+ JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer()
{
- JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+ JsonDeserializer<CreateChatRoomRequestTo> deserializer = new JsonDeserializer<>();
deserializer.configure(
Map.of(
JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
- JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
+ JsonDeserializer.VALUE_DEFAULT_TYPE, CreateChatRoomRequestTo.class,
JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
false );
return deserializer;
@Bean
ChatMessageChannel chatMessageChannel(
ChatBackendProperties properties,
- Producer<String, MessageTo> chatMessageChannelProducer,
- Consumer<String, MessageTo> chatMessageChannelConsumer,
+ Producer<String, AbstractTo> chatMessageChannelProducer,
+ Consumer<String, AbstractTo> chatMessageChannelConsumer,
ZoneId zoneId)
{
return new ChatMessageChannel(
}
@Bean
- Producer<String, MessageTo> chatMessageChannelProducer(
+ Producer<String, AbstractTo> chatMessageChannelProducer(
Properties defaultProducerProperties,
ChatBackendProperties chatBackendProperties,
StringSerializer stringSerializer,
- JsonSerializer<MessageTo> messageSerializer)
+ JsonSerializer<AbstractTo> messageSerializer)
{
Map<String, Object> properties = new HashMap<>();
defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
}
@Bean
- JsonSerializer<MessageTo> chatMessageSerializer()
+ JsonSerializer<AbstractTo> chatMessageSerializer()
{
- JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+ JsonSerializer<AbstractTo> serializer = new JsonSerializer<>();
serializer.configure(
- Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+ Map.of(JsonSerializer.TYPE_MAPPINGS,
+ "create:" + CreateChatRoomRequestTo.class.getCanonicalName() + "," +
+ "message:" + ChatMessageTo.class.getCanonicalName()),
false);
return serializer;
}
@Bean
- Consumer<String, MessageTo> chatMessageChannelConsumer(
+ Consumer<String, ChatMessageTo> chatMessageChannelConsumer(
Properties defaultConsumerProperties,
ChatBackendProperties chatBackendProperties,
StringDeserializer stringDeserializer,
- JsonDeserializer<MessageTo> messageDeserializer)
+ JsonDeserializer<ChatMessageTo> messageDeserializer)
{
Map<String, Object> properties = new HashMap<>();
defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
}
@Bean
- JsonDeserializer<MessageTo> chatMessageDeserializer()
+ JsonDeserializer<ChatMessageTo> chatMessageDeserializer()
{
- JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+ JsonDeserializer<ChatMessageTo> deserializer = new JsonDeserializer<>();
deserializer.configure(
Map.of(
JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
- JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
+ JsonDeserializer.VALUE_DEFAULT_TYPE, ChatMessageTo.class,
JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
false );
return deserializer;
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import java.time.LocalDateTime;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class MessageTo
-{
- private String user;
- private Long id;
- private String text;
-
- public Message toMessage(long offset, LocalDateTime timestamp)
- {
- return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
- }
-
- public static MessageTo from(Message message)
- {
- return
- new MessageTo(
- message.getUsername(),
- message.getId(),
- message.getMessageText());
- }
-}
package de.juplo.kafka.chat.backend;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
{
UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
int shard = shardingStrategy.selectShard(chatRoomId);
- chatRoomTemplate.send(CHATROOMS_TOPIC, null,"{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }");
- messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }");
- messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }");
- messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }");
- messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }");
+ send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }", "create");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "message");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "message");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "message");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "message");
+ }
+
+ static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+ {
+ ProducerRecord<String, String> record = new ProducerRecord<>(MESSAGES_TOPIC, key, value);
+ record.headers().add("__TypeId__", typeId.getBytes());
+ kafkaTemplate.send(record);
}
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class ChatMessageToTest
+{
+ final String json = """
+ {
+ "id": 1,
+ "text": "Hallo, ich heiße Peter!",
+ "user": "Peter"
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ ChatMessageTo message = mapper.readValue(json, ChatMessageTo.class);
+ assertThat(message.getId()).isEqualTo(1l);
+ assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
+ assertThat(message.getUser()).isEqualTo("Peter");
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class ChatRoomInfoToTest
+{
+ final String json = """
+ {
+ "id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+ "name": "Foo-Room!",
+ "shard": 666
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ ChatRoomInfoTo message = mapper.readValue(json, ChatRoomInfoTo.class);
+ assertThat(message.getId()).isEqualTo("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+ assertThat(message.getName()).isEqualTo("Foo-Room!");
+ assertThat(message.getShard()).isEqualTo(666);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class CreateChatRoomRequestToTest
+{
+ final String json = """
+ {
+ "id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+ "name": "Foo-Room!",
+ "shard": 666
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ CreateChatRoomRequestTo message = mapper.readValue(json, CreateChatRoomRequestTo.class);
+ assertThat(message.getId()).isEqualTo("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+ assertThat(message.getName()).isEqualTo("Foo-Room!");
+ assertThat(message.getShard()).isEqualTo(666);
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-
-
-public class MessageToTest
-{
- final String json = """
- {
- "id": 1,
- "text": "Hallo, ich heiße Peter!",
- "user": "Peter"
- }""";
-
- ObjectMapper mapper;
-
- @BeforeEach
- public void setUp()
- {
- mapper = new ObjectMapper();
- mapper.registerModule(new JavaTimeModule());
- mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
- }
-
- @Test
- public void testDeserialization() throws Exception
- {
- MessageTo message = mapper.readValue(json, MessageTo.class);
- assertThat(message.getId()).isEqualTo(1l);
- assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
- assertThat(message.getUser()).isEqualTo("Peter");
- }
-}