@RequiredArgsConstructor
public class AbstractTo
{
- public enum ToType { MESSAGE_SENT, CREATE_CHATROOM_REQUEST };
+ public enum ToType {
+ CREATE_CHATROOM_REQUEST,
+ MESSAGE_SENT,
+ CHATROOM_INFO
+ }
@Getter
private final ToType type;
package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
import lombok.Getter;
}
- Mono<Message> sendMessage(
+
+ Mono<ChatRoomInfo> sendCreateChatRoomRequest(
+ UUID chatRoomId,
+ String name)
+ {
+ CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(name);
+ return Mono.create(sink ->
+ {
+ ProducerRecord<String, CreateChatRoomRequestTo> record =
+ new ProducerRecord<>(
+ topic,
+ chatRoomId.toString(),
+ createChatRoomRequestTo);
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+ ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition());
+ createChatRoom(chatRoomInfo);
+ sink.success(chatRoomInfo);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send create-request for chat room (id={}, name={}): {}",
+ chatRoomId,
+ name,
+ exception);
+ sink.error(exception);
+ }
+ }));
+ });
+ }
+
+ Mono<Message> sendChatMessage(
UUID chatRoomId,
Message.MessageKey key,
LocalDateTime timestamp,
{
for (ConsumerRecord<String, AbstractTo> record : records)
{
+ UUID chatRoomId = UUID.fromString(record.key());
+
switch (record.value().getType())
{
case CREATE_CHATROOM_REQUEST:
- createChatRoom((CreateChatRoomRequestTo) record.value());
+ createChatRoom(
+ chatRoomId,
+ (CreateChatRoomRequestTo) record.value(),
+ record.partition());
break;
case MESSAGE_SENT:
- UUID chatRoomId = UUID.fromString(record.key());
Instant instant = Instant.ofEpochSecond(record.timestamp());
LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
loadChatMessage(
}
void createChatRoom(
+ UUID chatRoomId,
CreateChatRoomRequestTo createChatRoomRequestTo,
int partition)
{
- chatrooms[partition].put
+ putChatRoom(ChatRoomInfo.of(
+ chatRoomId,
+ createChatRoomRequestTo.getName(),
+ partition));
+ }
+
+
+ void createChatRoom(ChatRoomInfo chatRoomInfo)
+ {
+ UUID id = chatRoomInfo.getId();
+ String name = chatRoomInfo.getName();
+ int shard = chatRoomInfo.getShard();
+ log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
+ KafkaChatRoomService service = new KafkaChatRoomService(this, id);
+ ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+ putChatRoom(chatRoom);
}
void loadChatMessage(
}
- void putChatRoom(ChatRoom chatRoom)
+ private void putChatRoom(ChatRoom chatRoom)
{
Integer partition = chatRoom.getShard();
UUID chatRoomId = chatRoom.getId();
private boolean running;
- Mono<ChatRoomInfo> sendCreateChatRoomRequest(
- UUID chatRoomId,
- String name)
- {
- int shard = this.shardingStrategy.selectShard(chatRoomId);
- CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(chatRoomId.toString(), name, shard);
- return Mono.create(sink ->
- {
- ProducerRecord<Integer, CreateChatRoomRequestTo> record =
- new ProducerRecord<>(
- topic,
- shard,
- createChatRoomRequestTo);
-
- producer.send(record, ((metadata, exception) ->
- {
- if (metadata != null)
- {
- log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
- sink.success(createChatRoomRequestTo.toChatRoomInfo());
- }
- else
- {
- // On send-failure
- log.error(
- "Could not send create-request for chat room (id={}, name={}): {}",
- chatRoomId,
- name,
- exception);
- sink.error(exception);
- }
- }));
- });
- }
-
@Override
public void run()
{
@Data
public class CreateChatRoomRequestTo extends AbstractTo
{
- private String id;
private String name;
- private int shard;
public CreateChatRoomRequestTo()
}
- public ChatRoomInfo toChatRoomInfo()
- {
- return new ChatRoomInfo(UUID.fromString(id), name, shard);
- }
-
- public static CreateChatRoomRequestTo from(ChatRoom chatRoom)
- {
- return CreateChatRoomRequestTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
- }
-
- public static CreateChatRoomRequestTo of(String id, String name, int shard)
+ public static CreateChatRoomRequestTo of(String name)
{
CreateChatRoomRequestTo to = new CreateChatRoomRequestTo();
- to.id = id;
to.name = name;
- to.shard = shard;
return to;
}
}
String text)
{
return chatMessageChannel
- .sendMessage(chatRoomId, key, timestamp, text)
+ .sendChatMessage(chatRoomId, key, timestamp, text)
.doOnSuccess(message -> persistMessage(message));
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.Data;
+
+import java.util.UUID;
+
+
+@Data
+public class ChatRoomInfoTo extends AbstractTo
+{
+ private String id;
+ private String name;
+ private int shard;
+
+
+ public ChatRoomInfoTo()
+ {
+ super(ToType.CHATROOM_INFO);
+ }
+
+
+ public ChatRoomInfo toChatRoomInfo()
+ {
+ return new ChatRoomInfo(UUID.fromString(id), name, shard);
+ }
+
+ public static ChatRoomInfoTo from(ChatRoom chatRoom)
+ {
+ return ChatRoomInfoTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
+ }
+
+ public static ChatRoomInfoTo of(String id, String name, int shard)
+ {
+ ChatRoomInfoTo to = new ChatRoomInfoTo();
+ to.id = id;
+ to.name = name;
+ to.shard = shard;
+ return to;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class ChatRoomInfoToTest
+{
+ final String json = """
+ {
+ "id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+ "name": "Foo-Room!",
+ "shard": 666
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ ChatRoomInfoTo message = mapper.readValue(json, ChatRoomInfoTo.class);
+ assertThat(message.getId()).isEqualTo("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+ assertThat(message.getName()).isEqualTo("Foo-Room!");
+ assertThat(message.getShard()).isEqualTo(666);
+ }
+}