+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-
-@RequiredArgsConstructor
-public class AbstractMessageTo
-{
- public enum ToType {
- CREATE_CHATROOM_COMMAND,
- CHATMESSAGE_EVENT,
- }
-
- @Getter
- private final ToType type;
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.*;
-
-import java.time.LocalDateTime;
-
-
-@Getter
-@Setter
-@EqualsAndHashCode
-@ToString
-public class ChatMessageReceivedEventTo extends AbstractMessageTo
-{
- private String user;
- private Long id;
- private String text;
-
-
- public ChatMessageReceivedEventTo()
- {
- super(ToType.CHATMESSAGE_EVENT);
- }
-
-
- public Message toMessage(long offset, LocalDateTime timestamp)
- {
- return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
- }
-
- public static ChatMessageReceivedEventTo from(Message message)
- {
- return ChatMessageReceivedEventTo.of(
- message.getUsername(),
- message.getId(),
- message.getMessageText());
- }
-
-
- public static ChatMessageReceivedEventTo of(String user, Long id, String text)
- {
- ChatMessageReceivedEventTo to = new ChatMessageReceivedEventTo();
- to.user = user;
- to.id = id;
- to.text = text;
- return to;
- }
-}
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
UUID chatRoomId,
String name)
{
- CreateChatRoomCommandTo createChatRoomRequestTo = CreateChatRoomCommandTo.of(name);
+ CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
return Mono.create(sink ->
{
ProducerRecord<String, AbstractMessageTo> record =
null,
zdt.toEpochSecond(),
chatRoomId.toString(),
- ChatMessageReceivedEventTo.of(key.getUsername(), key.getMessageId(), text));
+ EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
producer.send(record, ((metadata, exception) ->
{
switch (record.value().getType())
{
- case CREATE_CHATROOM_COMMAND:
+ case COMMAND_CREATE_CHATROOM:
createChatRoom(
chatRoomId,
- (CreateChatRoomCommandTo) record.value(),
+ (CommandCreateChatRoomTo) record.value(),
record.partition());
break;
- case CHATMESSAGE_EVENT:
+ case EVENT_CHATMESSAGE_RECEIVED:
Instant instant = Instant.ofEpochSecond(record.timestamp());
LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
loadChatMessage(
chatRoomId,
timestamp,
record.offset(),
- (ChatMessageReceivedEventTo) record.value(),
+ (EventChatMessageReceivedTo) record.value(),
record.partition());
break;
void createChatRoom(
UUID chatRoomId,
- CreateChatRoomCommandTo createChatRoomRequestTo,
+ CommandCreateChatRoomTo createChatRoomRequestTo,
int partition)
{
log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
UUID chatRoomId,
LocalDateTime timestamp,
long offset,
- ChatMessageReceivedEventTo chatMessageTo,
+ EventChatMessageReceivedTo chatMessageTo,
int partition)
{
Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import lombok.*;
-
-
-@Getter
-@Setter
-@EqualsAndHashCode
-@ToString
-public class CreateChatRoomCommandTo extends AbstractMessageTo
-{
- private String name;
-
-
- public CreateChatRoomCommandTo()
- {
- super(ToType.CREATE_CHATROOM_COMMAND);
- }
-
-
- public static CreateChatRoomCommandTo of(String name)
- {
- CreateChatRoomCommandTo to = new CreateChatRoomCommandTo();
- to.name = name;
- return to;
- }
-}
public class KafkaChatHome implements ChatHome
{
private final KafkaLikeShardingStrategy shardingStrategy;
- private final ChatRoomChannel chatMessageChanel;
+ private final ChatRoomChannel chatRoomChannel;
@Override
public Mono<ChatRoom> getChatRoom(UUID id)
{
int shard = shardingStrategy.selectShard(id);
- if (chatMessageChanel.isLoadInProgress())
+ if (chatRoomChannel.isLoadInProgress())
{
throw new LoadInProgressException(shard);
}
else
{
- return chatMessageChanel.getChatRoom(shard, id);
+ return chatRoomChannel.getChatRoom(shard, id);
}
}
@Override
public Flux<ChatRoom> getChatRooms()
{
- return chatMessageChanel.getChatRooms();
+ return chatRoomChannel.getChatRooms();
}
}
@Override
public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
- log.info("Sending create-request for chat rooom: id={}, name={}");
+ log.info("Sending create-command for chat rooom: id={}, name={}");
return chatRoomChannel.sendCreateChatRoomRequest(id, name);
}
}
package de.juplo.kafka.chat.backend.persistence.kafka;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ChatHome;
import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
properties.put(
ProducerConfig.CLIENT_ID_CONFIG,
- chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER");
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
return new KafkaProducer<>(
properties,
stringSerializer,
defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
properties.put(
ConsumerConfig.CLIENT_ID_CONFIG,
- chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
properties.put(
ConsumerConfig.GROUP_ID_CONFIG,
"chat_message_channel");
String typeMappings ()
{
return
- "create:" + CreateChatRoomCommandTo.class.getCanonicalName() + "," +
- "message:" + ChatMessageReceivedEventTo.class.getCanonicalName();
+ "create:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," +
+ "message:" + EventChatMessageReceivedTo.class.getCanonicalName();
}
@Bean
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+public class AbstractMessageTo
+{
+ public enum ToType {
+ COMMAND_CREATE_CHATROOM,
+ EVENT_CHATMESSAGE_RECEIVED,
+ }
+
+ @Getter
+ private final ToType type;
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import lombok.*;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class CommandCreateChatRoomTo extends AbstractMessageTo
+{
+ private String name;
+
+
+ public CommandCreateChatRoomTo()
+ {
+ super(ToType.COMMAND_CREATE_CHATROOM);
+ }
+
+
+ public static CommandCreateChatRoomTo of(String name)
+ {
+ CommandCreateChatRoomTo to = new CommandCreateChatRoomTo();
+ to.name = name;
+ return to;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.*;
+
+import java.time.LocalDateTime;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class EventChatMessageReceivedTo extends AbstractMessageTo
+{
+ private String user;
+ private Long id;
+ private String text;
+
+
+ public EventChatMessageReceivedTo()
+ {
+ super(ToType.EVENT_CHATMESSAGE_RECEIVED);
+ }
+
+
+ public Message toMessage(long offset, LocalDateTime timestamp)
+ {
+ return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
+ }
+
+ public static EventChatMessageReceivedTo from(Message message)
+ {
+ return EventChatMessageReceivedTo.of(
+ message.getUsername(),
+ message.getId(),
+ message.getMessageText());
+ }
+
+
+ public static EventChatMessageReceivedTo of(String user, Long id, String text)
+ {
+ EventChatMessageReceivedTo to = new EventChatMessageReceivedTo();
+ to.user = user;
+ to.id = id;
+ to.text = text;
+ return to;
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-
-
-public class ChatMessageReceivedEventToTest
-{
- final String json = """
- {
- "id": 1,
- "text": "Hallo, ich heiße Peter!",
- "user": "Peter"
- }""";
-
- ObjectMapper mapper;
-
- @BeforeEach
- public void setUp()
- {
- mapper = new ObjectMapper();
- mapper.registerModule(new JavaTimeModule());
- mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
- }
-
- @Test
- public void testDeserialization() throws Exception
- {
- ChatMessageReceivedEventTo message = mapper.readValue(json, ChatMessageReceivedEventTo.class);
- assertThat(message.getId()).isEqualTo(1l);
- assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
- assertThat(message.getUser()).isEqualTo("Peter");
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-
-
-public class CreateChatRoomCommandToTest
-{
- final String json = """
- {
- "name": "Foo-Room!"
- }""";
-
- ObjectMapper mapper;
-
- @BeforeEach
- public void setUp()
- {
- mapper = new ObjectMapper();
- mapper.registerModule(new JavaTimeModule());
- mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
- }
-
- @Test
- public void testDeserialization() throws Exception
- {
- CreateChatRoomCommandTo message = mapper.readValue(json, CreateChatRoomCommandTo.class);
- assertThat(message.getName()).isEqualTo("Foo-Room!");
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class CommandCreateChatRoomToTest
+{
+ final String json = """
+ {
+ "name": "Foo-Room!"
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class);
+ assertThat(message.getName()).isEqualTo("Foo-Room!");
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class EventChatMessageReceivedToTest
+{
+ final String json = """
+ {
+ "id": 1,
+ "text": "Hallo, ich heiße Peter!",
+ "user": "Peter"
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class);
+ assertThat(message.getId()).isEqualTo(1l);
+ assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
+ assertThat(message.getUser()).isEqualTo("Peter");
+ }
+}