--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+public class AbstractTo
+{
+ public enum ToType { MESSAGE_SENT, CREATE_CHATROOM_REQUEST };
+
+ @Getter
+ private final ToType type;
+}
public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
{
private final String topic;
- private final Producer<String, MessageTo> producer;
- private final Consumer<String, MessageTo> consumer;
+ private final Producer<String, AbstractTo> producer;
+ private final Consumer<String, AbstractTo> consumer;
private final ZoneId zoneId;
private final int numShards;
private final boolean[] isShardOwned;
public ChatMessageChannel(
String topic,
- Producer<String, MessageTo> producer,
- Consumer<String, MessageTo> consumer,
+ Producer<String, AbstractTo> producer,
+ Consumer<String, AbstractTo> consumer,
ZoneId zoneId,
int numShards)
{
ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
return Mono.create(sink ->
{
- ProducerRecord<String, MessageTo> record =
+ ProducerRecord<String, ChatMessageTo> record =
new ProducerRecord<>(
tp.topic(),
tp.partition(),
zdt.toEpochSecond(),
chatRoomId.toString(),
- MessageTo.of(key.getUsername(), key.getMessageId(), text));
+ ChatMessageTo.of(key.getUsername(), key.getMessageId(), text));
producer.send(record, ((metadata, exception) ->
{
{
try
{
- ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<String, AbstractTo> records = consumer.poll(Duration.ofMinutes(5));
log.info("Fetched {} messages", records.count());
if (loadInProgress)
{
- loadMessages(records);
+ loadChatRoom(records);
if (isLoadingCompleted())
{
log.info("Exiting normally");
}
- void loadMessages(ConsumerRecords<String, MessageTo> records)
+ void loadChatRoom(ConsumerRecords<String, AbstractTo> records)
{
- for (ConsumerRecord<String, MessageTo> record : records)
+ for (ConsumerRecord<String, AbstractTo> record : records)
{
- nextOffset[record.partition()] = record.offset() + 1;
- UUID chatRoomId = UUID.fromString(record.key());
- MessageTo messageTo = record.value();
+ switch (record.value().getType())
+ {
+ case CREATE_CHATROOM_REQUEST:
+ createChatRoom((CreateChatRoomRequestTo) record.value());
+ break;
+
+ case MESSAGE_SENT:
+ UUID chatRoomId = UUID.fromString(record.key());
+ Instant instant = Instant.ofEpochSecond(record.timestamp());
+ LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+ loadChatMessage(
+ chatRoomId,
+ timestamp,
+ record.offset(),
+ (ChatMessageTo) record.value(),
+ record.partition());
+ break;
+ }
- Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
+ nextOffset[record.partition()] = record.offset() + 1;
+ }
+ }
- Instant instant = Instant.ofEpochSecond(record.timestamp());
- LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+ void createChatRoom(
+ CreateChatRoomRequestTo createChatRoomRequestTo,
+ int partition)
+ {
+ chatrooms[partition].put
+ }
- Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
+ void loadChatMessage(
+ UUID chatRoomId,
+ LocalDateTime timestamp,
+ long offset,
+ ChatMessageTo chatMessageTo,
+ int partition)
+ {
+ Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
+ Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
- ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
- if (chatRoom == null)
- {
- // TODO: Alles pausieren und erst von putChatRoom wieder resumen lassen!
- }
- KafkaChatRoomService kafkaChatRoomService =
- (KafkaChatRoomService) chatRoom.getChatRoomService();
+ ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
+ KafkaChatRoomService kafkaChatRoomService =
+ (KafkaChatRoomService) chatRoom.getChatRoomService();
- kafkaChatRoomService.persistMessage(message);
- }
+ kafkaChatRoomService.persistMessage(message);
}
boolean isLoadingCompleted()
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+
+@Data
+public class ChatMessageTo extends AbstractTo
+{
+ private String user;
+ private Long id;
+ private String text;
+
+
+ public ChatMessageTo()
+ {
+ super(ToType.MESSAGE_SENT);
+ }
+
+
+ public Message toMessage(long offset, LocalDateTime timestamp)
+ {
+ return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
+ }
+
+ public static ChatMessageTo from(Message message)
+ {
+ return ChatMessageTo.of(
+ message.getUsername(),
+ message.getId(),
+ message.getMessageText());
+ }
+
+
+ public static ChatMessageTo of(String user, Long id, String text)
+ {
+ ChatMessageTo to = new ChatMessageTo();
+ to.user = user;
+ to.id = id;
+ to.text = text;
+ return to;
+ }
+}
public class ChatRoomChannel implements Runnable
{
private final String topic;
- private final Producer<Integer, ChatRoomTo> producer;
- private final Consumer<Integer, ChatRoomTo> consumer;
+ private final Producer<Integer, CreateChatRoomRequestTo> producer;
+ private final Consumer<Integer, CreateChatRoomRequestTo> consumer;
private final ShardingStrategy shardingStrategy;
private final ChatMessageChannel chatMessageChannel;
private final Clock clock;
String name)
{
int shard = this.shardingStrategy.selectShard(chatRoomId);
- ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId.toString(), name, shard);
+ CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(chatRoomId.toString(), name, shard);
return Mono.create(sink ->
{
- ProducerRecord<Integer, ChatRoomTo> record =
+ ProducerRecord<Integer, CreateChatRoomRequestTo> record =
new ProducerRecord<>(
topic,
shard,
- chatRoomTo);
+ createChatRoomRequestTo);
producer.send(record, ((metadata, exception) ->
{
if (metadata != null)
{
- log.info("Successfully send chreate-request for chat room: {}", chatRoomTo);
- sink.success(chatRoomTo.toChatRoomInfo());
+ log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+ sink.success(createChatRoomRequestTo.toChatRoomInfo());
}
else
{
{
try
{
- ConsumerRecords<Integer, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<Integer, CreateChatRoomRequestTo> records = consumer.poll(Duration.ofMinutes(5));
log.info("Fetched {} messages", records.count());
- for (ConsumerRecord<Integer, ChatRoomTo> record : records)
+ for (ConsumerRecord<Integer, CreateChatRoomRequestTo> record : records)
{
createChatRoom(record.value().toChatRoomInfo());
}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import java.util.UUID;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class ChatRoomTo
-{
- private String id;
- private String name;
- private int shard;
-
- public ChatRoomInfo toChatRoomInfo()
- {
- return new ChatRoomInfo(UUID.fromString(id), name, shard);
- }
-
- public static ChatRoomTo from(ChatRoom chatRoom)
- {
- return ChatRoomTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.Data;
+
+import java.util.UUID;
+
+
+@Data
+public class CreateChatRoomRequestTo extends AbstractTo
+{
+ private String id;
+ private String name;
+ private int shard;
+
+
+ public CreateChatRoomRequestTo()
+ {
+ super(ToType.CREATE_CHATROOM_REQUEST);
+ }
+
+
+ public ChatRoomInfo toChatRoomInfo()
+ {
+ return new ChatRoomInfo(UUID.fromString(id), name, shard);
+ }
+
+ public static CreateChatRoomRequestTo from(ChatRoom chatRoom)
+ {
+ return CreateChatRoomRequestTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
+ }
+
+ public static CreateChatRoomRequestTo of(String id, String name, int shard)
+ {
+ CreateChatRoomRequestTo to = new CreateChatRoomRequestTo();
+ to.id = id;
+ to.name = name;
+ to.shard = shard;
+ return to;
+ }
+}
@Autowired
ChatRoomChannel chatRoomChannel;
@Autowired
- Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer;
+ Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer;
@Autowired
ChatMessageChannel chatMessageChannel;
@Autowired
- Consumer<String, MessageTo> chatMessageChannelConsumer;
+ Consumer<String, ChatMessageTo> chatMessageChannelConsumer;
CompletableFuture<Void> chatRoomChannelConsumerJob;
CompletableFuture<Void> chatMessageChannelConsumerJob;
@Bean
ChatRoomChannel chatRoomChannel(
ChatBackendProperties properties,
- Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
- Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
+ Producer<Integer, CreateChatRoomRequestTo> chatRoomChannelProducer,
+ Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer,
ShardingStrategy shardingStrategy,
ChatMessageChannel chatMessageChannel,
Clock clock)
}
@Bean
- Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
+ Producer<Integer, CreateChatRoomRequestTo> chatRoomChannelProducer(
Properties defaultProducerProperties,
ChatBackendProperties chatBackendProperties,
IntegerSerializer integerSerializer,
- JsonSerializer<ChatRoomTo> chatRoomSerializer)
+ JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer)
{
Map<String, Object> properties = new HashMap<>();
defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
}
@Bean
- JsonSerializer<ChatRoomTo> chatRoomSerializer()
+ JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer()
{
- JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+ JsonSerializer<CreateChatRoomRequestTo> serializer = new JsonSerializer<>();
serializer.configure(
Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
false);
}
@Bean
- Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
+ Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer(
Properties defaultConsumerProperties,
ChatBackendProperties chatBackendProperties,
IntegerDeserializer integerDeserializer,
- JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+ JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer)
{
Map<String, Object> properties = new HashMap<>();
defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
}
@Bean
- JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+ JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer()
{
- JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+ JsonDeserializer<CreateChatRoomRequestTo> deserializer = new JsonDeserializer<>();
deserializer.configure(
Map.of(
JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
- JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
+ JsonDeserializer.VALUE_DEFAULT_TYPE, CreateChatRoomRequestTo.class,
JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
false );
return deserializer;
@Bean
ChatMessageChannel chatMessageChannel(
ChatBackendProperties properties,
- Producer<String, MessageTo> chatMessageChannelProducer,
- Consumer<String, MessageTo> chatMessageChannelConsumer,
+ Producer<String, AbstractTo> chatMessageChannelProducer,
+ Consumer<String, AbstractTo> chatMessageChannelConsumer,
ZoneId zoneId)
{
return new ChatMessageChannel(
}
@Bean
- Producer<String, MessageTo> chatMessageChannelProducer(
+ Producer<String, AbstractTo> chatMessageChannelProducer(
Properties defaultProducerProperties,
ChatBackendProperties chatBackendProperties,
StringSerializer stringSerializer,
- JsonSerializer<MessageTo> messageSerializer)
+ JsonSerializer<AbstractTo> messageSerializer)
{
Map<String, Object> properties = new HashMap<>();
defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
}
@Bean
- JsonSerializer<MessageTo> chatMessageSerializer()
+ JsonSerializer<AbstractTo> chatMessageSerializer()
{
- JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+ JsonSerializer<AbstractTo> serializer = new JsonSerializer<>();
serializer.configure(
- Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+ Map.of(JsonSerializer.TYPE_MAPPINGS,
+ "create:" + CreateChatRoomRequestTo.class.getCanonicalName() + "," +
+ "message:" + ChatMessageTo.class.getCanonicalName()),
false);
return serializer;
}
@Bean
- Consumer<String, MessageTo> chatMessageChannelConsumer(
+ Consumer<String, ChatMessageTo> chatMessageChannelConsumer(
Properties defaultConsumerProperties,
ChatBackendProperties chatBackendProperties,
StringDeserializer stringDeserializer,
- JsonDeserializer<MessageTo> messageDeserializer)
+ JsonDeserializer<ChatMessageTo> messageDeserializer)
{
Map<String, Object> properties = new HashMap<>();
defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
}
@Bean
- JsonDeserializer<MessageTo> chatMessageDeserializer()
+ JsonDeserializer<ChatMessageTo> chatMessageDeserializer()
{
- JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+ JsonDeserializer<ChatMessageTo> deserializer = new JsonDeserializer<>();
deserializer.configure(
Map.of(
JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
- JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
+ JsonDeserializer.VALUE_DEFAULT_TYPE, ChatMessageTo.class,
JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
false );
return deserializer;
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import java.time.LocalDateTime;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class MessageTo
-{
- private String user;
- private Long id;
- private String text;
-
- public Message toMessage(long offset, LocalDateTime timestamp)
- {
- return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
- }
-
- public static MessageTo from(Message message)
- {
- return
- new MessageTo(
- message.getUsername(),
- message.getId(),
- message.getMessageText());
- }
-}
package de.juplo.kafka.chat.backend;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
{
UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
int shard = shardingStrategy.selectShard(chatRoomId);
- chatRoomTemplate.send(CHATROOMS_TOPIC, null,"{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }");
- messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }");
- messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }");
- messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }");
- messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }");
+ send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }", "create");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "message");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "message");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "message");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "message");
+ }
+
+ static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+ {
+ ProducerRecord<String, String> record = new ProducerRecord<>(MESSAGES_TOPIC, key, value);
+ record.headers().add("__TypeId__", typeId.getBytes());
+ kafkaTemplate.send(record);
}
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class ChatMessageToTest
+{
+ final String json = """
+ {
+ "id": 1,
+ "text": "Hallo, ich heiße Peter!",
+ "user": "Peter"
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ ChatMessageTo message = mapper.readValue(json, ChatMessageTo.class);
+ assertThat(message.getId()).isEqualTo(1l);
+ assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
+ assertThat(message.getUser()).isEqualTo("Peter");
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class CreateChatRoomRequestToTest
+{
+ final String json = """
+ {
+ "id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+ "name": "Foo-Room!",
+ "shard": 666
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ CreateChatRoomRequestTo message = mapper.readValue(json, CreateChatRoomRequestTo.class);
+ assertThat(message.getId()).isEqualTo("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+ assertThat(message.getName()).isEqualTo("Foo-Room!");
+ assertThat(message.getShard()).isEqualTo(666);
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-
-
-public class MessageToTest
-{
- final String json = """
- {
- "id": 1,
- "text": "Hallo, ich heiße Peter!",
- "user": "Peter"
- }""";
-
- ObjectMapper mapper;
-
- @BeforeEach
- public void setUp()
- {
- mapper = new ObjectMapper();
- mapper.registerModule(new JavaTimeModule());
- mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
- }
-
- @Test
- public void testDeserialization() throws Exception
- {
- MessageTo message = mapper.readValue(json, MessageTo.class);
- assertThat(message.getId()).isEqualTo(1l);
- assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
- assertThat(message.getUser()).isEqualTo("Peter");
- }
-}