@Slf4j
public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
{
- private Consumer<String, >
+ private Consumer<String, MessageTo>
private final long[] offsets;
private final Map<UUID, ChatRoom>[] chatrooms;
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class MessageTo
+{
+ private Long id;
+ private String user;
+ private String text;
+
+ public Message toMessage(long offset, LocalDateTime timestamp)
+ {
+ return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
+ }
+
+ public static MessageTo from(Message message)
+ {
+ return
+ new MessageTo(
+ message.getId(),
+ message.getUsername(),
+ message.getMessageText());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class MessageToTest
+{
+ final String json = """
+ {
+ "id": 1,
+ "text": "Hallo, ich heiße Peter!",
+ "user": "Peter"
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ MessageTo message = mapper.readValue(json, MessageTo.class);
+ assertThat(message.getId()).isEqualTo(1l);
+ assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
+ assertThat(message.getUser()).isEqualTo("Peter");
+ }
+}