From: Kai Moritz Date: Fri, 21 Apr 2023 08:28:02 +0000 (+0200) Subject: NG X-Git-Tag: rebase--2023-08-18-abends~17 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=2675dd739782da0b1606b2d88c5f6ae52078463e;p=demos%2Fkafka%2Fchat NG NEU vs. NG ?? Besser: - Create-Requests für ChatRoom's auch in den Message-Channel schreiben - Dann, wenn sie dort gelesen werden, _zusätzlich nachträglich_ in den Chatroom-Channel. - Grund: Dann fällt das hier übrig gebliebene _nicht-triviale_ Problem weg, bzw. löst sich in Wohlgefallen auf, da die Create-Requests automatisch in der richtigen Reihenfolge (also vor allen Messages, für einen bestimmten ChatRoom) in dem Message-Channel gelesen werden Außerdem: - Der Chatroom-Channel wird ("später") auch als allgemeiner Info-Channel benötigt, in den die Instanzen _insbesondere_ auch veröffentlichen, welche Partitionen ihnen gerade zugeordnet sind. - Der Chatroom-Channel sollte daher auf Dauer Info-Channel heißen und der Message-Channel eher allgemeiner Chatroom-Channel (im Sinne von hier alles zum Thema ChatRoom und den daran veröffentlichten Nachrichten...) --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 6091c0c5..15d542a7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -10,5 +10,5 @@ public interface ChatHome { Mono getChatRoom(UUID id); - Flux getChatRooms(); + Flux getChatRooms(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java index bedd0aac..38076800 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java @@ -1,11 +1,12 @@ package de.juplo.kafka.chat.backend.persistence; import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import reactor.core.publisher.Flux; public interface StorageStrategy { - void write(Flux chatroomFlux); + void write(Flux chatroomFlux); Flux read(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java new file mode 100644 index 00000000..5c08aa2a --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java @@ -0,0 +1,19 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +public class AbstractTo +{ + public enum ToType { + CREATE_CHATROOM_REQUEST, + MESSAGE_SENT, + CHATROOM_INFO + } + + @Getter + private final ToType type; +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index ac30f1d3..94f6fa6b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -1,8 +1,8 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -25,15 +25,16 @@ import java.util.stream.IntStream; public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { private final String topic; - private final Producer producer; - private final Consumer consumer; + private final Producer producer; + private final Consumer consumer; private final ZoneId zoneId; private final int numShards; + private final int bufferSize; + private final Clock clock; private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; private final Map[] chatrooms; - private final KafkaLikeShardingStrategy shardingStrategy; private boolean running; @Getter @@ -42,10 +43,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener public ChatMessageChannel( String topic, - Producer producer, - Consumer consumer, + Producer producer, + Consumer consumer, ZoneId zoneId, - int numShards) + int numShards, + int bufferSize, + Clock clock) { log.debug( "Creating ChatMessageChannel for topic {} with {} partitions", @@ -56,6 +59,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener this.producer = producer; this.zoneId = zoneId; this.numShards = numShards; + this.bufferSize = bufferSize; + this.clock = clock; this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; @@ -63,28 +68,62 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener IntStream .range(0, numShards) .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); - this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); } - Mono sendMessage( + + Mono sendCreateChatRoomRequest( + UUID chatRoomId, + String name) + { + CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(name); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + chatRoomId.toString(), + createChatRoomRequestTo); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); + ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition()); + createChatRoom(chatRoomInfo); + sink.success(chatRoomInfo); + } + else + { + // On send-failure + log.error( + "Could not send create-request for chat room (id={}, name={}): {}", + chatRoomId, + name, + exception); + sink.error(exception); + } + })); + }); + } + + Mono sendChatMessage( UUID chatRoomId, Message.MessageKey key, LocalDateTime timestamp, String text) { - int shard = this.shardingStrategy.selectShard(chatRoomId); - TopicPartition tp = new TopicPartition(topic, shard); ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( - tp.topic(), - tp.partition(), + topic, + null, zdt.toEpochSecond(), chatRoomId.toString(), - MessageTo.of(key.getUsername(), key.getMessageId(), text)); + ChatMessageTo.of(key.getUsername(), key.getMessageId(), text)); producer.send(record, ((metadata, exception) -> { @@ -165,12 +204,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); log.info("Fetched {} messages", records.count()); if (loadInProgress) { - loadMessages(records); + loadChatRoom(records); if (isLoadingCompleted()) { @@ -198,33 +237,84 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - void loadMessages(ConsumerRecords records) + void loadChatRoom(ConsumerRecords records) { - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { - nextOffset[record.partition()] = record.offset() + 1; UUID chatRoomId = UUID.fromString(record.key()); - MessageTo messageTo = record.value(); - - Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId()); - Instant instant = Instant.ofEpochSecond(record.timestamp()); - LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); - - Message message = new Message(key, record.offset(), timestamp, messageTo.getText()); - - ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId); - if (chatRoom == null) + switch (record.value().getType()) { - // Alles pausieren und erst von putChatRoom wieder resumen lassen! + case CREATE_CHATROOM_REQUEST: + createChatRoom( + chatRoomId, + (CreateChatRoomRequestTo) record.value(), + record.partition()); + break; + + case MESSAGE_SENT: + Instant instant = Instant.ofEpochSecond(record.timestamp()); + LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + loadChatMessage( + chatRoomId, + timestamp, + record.offset(), + (ChatMessageTo) record.value(), + record.partition()); + break; + + default: + log.debug( + "Ignoring message for chat-room {} with offset {}: {}", + chatRoomId, + record.offset(), + record.value()); } - KafkaChatRoomService kafkaChatRoomService = - (KafkaChatRoomService) chatRoom.getChatRoomService(); - kafkaChatRoomService.persistMessage(message); + nextOffset[record.partition()] = record.offset() + 1; } } + void createChatRoom( + UUID chatRoomId, + CreateChatRoomRequestTo createChatRoomRequestTo, + int partition) + { + putChatRoom(ChatRoomInfo.of( + chatRoomId, + createChatRoomRequestTo.getName(), + partition)); + } + + + void createChatRoom(ChatRoomInfo chatRoomInfo) + { + UUID id = chatRoomInfo.getId(); + String name = chatRoomInfo.getName(); + int shard = chatRoomInfo.getShard(); + log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, id); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + putChatRoom(chatRoom); + } + + void loadChatMessage( + UUID chatRoomId, + LocalDateTime timestamp, + long offset, + ChatMessageTo chatMessageTo, + int partition) + { + Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); + Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); + + ChatRoom chatRoom = chatrooms[partition].get(chatRoomId); + KafkaChatRoomService kafkaChatRoomService = + (KafkaChatRoomService) chatRoom.getChatRoomService(); + + kafkaChatRoomService.persistMessage(message); + } + boolean isLoadingCompleted() { return IntStream @@ -243,7 +333,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener } - void putChatRoom(ChatRoom chatRoom) + private void putChatRoom(ChatRoom chatRoom) { Integer partition = chatRoom.getShard(); UUID chatRoomId = chatRoom.getId(); @@ -266,13 +356,4 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { return Mono.justOrEmpty(chatrooms[shard].get(id)); } - - Flux getChatRooms() - { - return Flux.fromStream(IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> Integer.valueOf(shard)) - .flatMap(shard -> chatrooms[shard].values().stream())); - } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageTo.java new file mode 100644 index 00000000..41ce00a4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageTo.java @@ -0,0 +1,45 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.Data; + +import java.time.LocalDateTime; + + +@Data +public class ChatMessageTo extends AbstractTo +{ + private String user; + private Long id; + private String text; + + + public ChatMessageTo() + { + super(ToType.MESSAGE_SENT); + } + + + public Message toMessage(long offset, LocalDateTime timestamp) + { + return new Message(Message.MessageKey.of(user, id), offset, timestamp, text); + } + + public static ChatMessageTo from(Message message) + { + return ChatMessageTo.of( + message.getUsername(), + message.getId(), + message.getMessageText()); + } + + + public static ChatMessageTo of(String user, Long id, String text) + { + ChatMessageTo to = new ChatMessageTo(); + to.user = user; + to.id = id; + to.text = text; + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 97ee9885..1c6ae915 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -10,11 +10,15 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.stream.IntStream; @RequiredArgsConstructor @@ -22,51 +26,12 @@ import java.util.UUID; public class ChatRoomChannel implements Runnable { private final String topic; - private final Producer producer; - private final Consumer consumer; - private final ShardingStrategy shardingStrategy; - private final ChatMessageChannel chatMessageChannel; - private final Clock clock; - private final int bufferSize; + private final Consumer consumer; + private final Map chatrooms = new HashMap<>(); private boolean running; - Mono sendCreateChatRoomRequest( - UUID chatRoomId, - String name) - { - int shard = this.shardingStrategy.selectShard(chatRoomId); - ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId.toString(), name, shard); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - shard, - chatRoomTo); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - log.info("Successfully send chreate-request for chat room: {}", chatRoomTo); - sink.success(chatRoomTo.toChatRoomInfo()); - } - else - { - // On send-failure - log.error( - "Could not send create-request for chat room (id={}, name={}): {}", - chatRoomId, - name, - exception); - sink.error(exception); - } - })); - }); - } - @Override public void run() { @@ -78,12 +43,24 @@ public class ChatRoomChannel implements Runnable { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); log.info("Fetched {} messages", records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { - createChatRoom(record.value().toChatRoomInfo()); + switch (record.value().getType()) + { + case CHATROOM_INFO: + createChatRoom((ChatRoomInfoTo) record.value()); + break; + + default: + log.debug( + "Ignoring message for key {} with offset {}: {}", + record.key(), + record.offset(), + record.value()); + } } } catch (WakeupException e) @@ -97,14 +74,14 @@ public class ChatRoomChannel implements Runnable } - void createChatRoom(ChatRoomInfo chatRoomInfo) + void createChatRoom(ChatRoomInfoTo chatRoomInfoTo) + { + ChatRoomInfo chatRoomInfo = chatRoomInfoTo.toChatRoomInfo(); + chatrooms.put(chatRoomInfo.getId(), chatRoomInfo); + } + + Flux getChatRooms() { - UUID id = chatRoomInfo.getId(); - String name = chatRoomInfo.getName(); - int shard = chatRoomInfo.getShard(); - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id); - ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); - chatMessageChannel.putChatRoom(chatRoom); + return Flux.fromIterable(chatrooms.values()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java new file mode 100644 index 00000000..f232c782 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java @@ -0,0 +1,42 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.Data; + +import java.util.UUID; + + +@Data +public class ChatRoomInfoTo extends AbstractTo +{ + private String id; + private String name; + private int shard; + + + public ChatRoomInfoTo() + { + super(ToType.CHATROOM_INFO); + } + + + public ChatRoomInfo toChatRoomInfo() + { + return new ChatRoomInfo(UUID.fromString(id), name, shard); + } + + public static ChatRoomInfoTo from(ChatRoom chatRoom) + { + return ChatRoomInfoTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard()); + } + + public static ChatRoomInfoTo of(String id, String name, int shard) + { + ChatRoomInfoTo to = new ChatRoomInfoTo(); + to.id = id; + to.name = name; + to.shard = shard; + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java deleted file mode 100644 index e5649816..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java +++ /dev/null @@ -1,30 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.UUID; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class ChatRoomTo -{ - private String id; - private String name; - private int shard; - - public ChatRoomInfo toChatRoomInfo() - { - return new ChatRoomInfo(UUID.fromString(id), name, shard); - } - - public static ChatRoomTo from(ChatRoom chatRoom) - { - return ChatRoomTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard()); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java new file mode 100644 index 00000000..b6ad38fb --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java @@ -0,0 +1,28 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.Data; + +import java.util.UUID; + + +@Data +public class CreateChatRoomRequestTo extends AbstractTo +{ + private String name; + + + public CreateChatRoomRequestTo() + { + super(ToType.CREATE_CHATROOM_REQUEST); + } + + + public static CreateChatRoomRequestTo of(String name) + { + CreateChatRoomRequestTo to = new CreateChatRoomRequestTo(); + to.name = name; + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 88947a04..ac87aac8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -17,6 +16,7 @@ import java.util.*; public class KafkaChatHome implements ChatHome { private final ShardingStrategy shardingStrategy; + private final ChatRoomChannel chatRoomChannel; private final ChatMessageChannel chatMessageChanel; @@ -35,15 +35,8 @@ public class KafkaChatHome implements ChatHome } @Override - public Flux getChatRooms() + public Flux getChatRooms() { - if (chatMessageChanel.isLoadInProgress()) - { - throw new LoadInProgressException(); - } - else - { - return chatMessageChanel.getChatRooms(); - } + return chatRoomChannel.getChatRooms(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 09861946..f8022348 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -28,7 +28,7 @@ public class KafkaChatRoomService implements ChatRoomService String text) { return chatMessageChannel - .sendMessage(chatRoomId, key, timestamp, text) + .sendChatMessage(chatRoomId, key, timestamp, text) .doOnSuccess(message -> persistMessage(message)); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java index ee5834e5..8a9e32ed 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -30,11 +30,11 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @Autowired ChatRoomChannel chatRoomChannel; @Autowired - Consumer chatRoomChannelConsumer; + Consumer chatRoomChannelConsumer; @Autowired ChatMessageChannel chatMessageChannel; @Autowired - Consumer chatMessageChannelConsumer; + Consumer chatMessageChannelConsumer; CompletableFuture chatRoomChannelConsumerJob; CompletableFuture chatMessageChannelConsumerJob; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 43507792..9e1f75e7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -51,8 +51,8 @@ public class KafkaServicesConfiguration @Bean ChatRoomChannel chatRoomChannel( ChatBackendProperties properties, - Producer chatRoomChannelProducer, - Consumer chatRoomChannelConsumer, + Producer chatRoomChannelProducer, + Consumer chatRoomChannelConsumer, ShardingStrategy shardingStrategy, ChatMessageChannel chatMessageChannel, Clock clock) @@ -68,11 +68,11 @@ public class KafkaServicesConfiguration } @Bean - Producer chatRoomChannelProducer( + Producer chatRoomChannelProducer( Properties defaultProducerProperties, ChatBackendProperties chatBackendProperties, IntegerSerializer integerSerializer, - JsonSerializer chatRoomSerializer) + JsonSerializer chatRoomSerializer) { Map properties = new HashMap<>(); defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -92,9 +92,9 @@ public class KafkaServicesConfiguration } @Bean - JsonSerializer chatRoomSerializer() + JsonSerializer chatRoomSerializer() { - JsonSerializer serializer = new JsonSerializer<>(); + JsonSerializer serializer = new JsonSerializer<>(); serializer.configure( Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), false); @@ -102,11 +102,11 @@ public class KafkaServicesConfiguration } @Bean - Consumer chatRoomChannelConsumer( + Consumer chatRoomChannelConsumer( Properties defaultConsumerProperties, ChatBackendProperties chatBackendProperties, IntegerDeserializer integerDeserializer, - JsonDeserializer chatRoomDeserializer) + JsonDeserializer chatRoomDeserializer) { Map properties = new HashMap<>(); defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -129,13 +129,13 @@ public class KafkaServicesConfiguration } @Bean - JsonDeserializer chatRoomDeserializer() + JsonDeserializer chatRoomDeserializer() { - JsonDeserializer deserializer = new JsonDeserializer<>(); + JsonDeserializer deserializer = new JsonDeserializer<>(); deserializer.configure( Map.of( JsonDeserializer.USE_TYPE_INFO_HEADERS, false, - JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class, + JsonDeserializer.VALUE_DEFAULT_TYPE, CreateChatRoomRequestTo.class, JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), false ); return deserializer; @@ -150,8 +150,8 @@ public class KafkaServicesConfiguration @Bean ChatMessageChannel chatMessageChannel( ChatBackendProperties properties, - Producer chatMessageChannelProducer, - Consumer chatMessageChannelConsumer, + Producer chatMessageChannelProducer, + Consumer chatMessageChannelConsumer, ZoneId zoneId) { return new ChatMessageChannel( @@ -163,11 +163,11 @@ public class KafkaServicesConfiguration } @Bean - Producer chatMessageChannelProducer( + Producer chatMessageChannelProducer( Properties defaultProducerProperties, ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, - JsonSerializer messageSerializer) + JsonSerializer messageSerializer) { Map properties = new HashMap<>(); defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -187,21 +187,23 @@ public class KafkaServicesConfiguration } @Bean - JsonSerializer chatMessageSerializer() + JsonSerializer chatMessageSerializer() { - JsonSerializer serializer = new JsonSerializer<>(); + JsonSerializer serializer = new JsonSerializer<>(); serializer.configure( - Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + Map.of(JsonSerializer.TYPE_MAPPINGS, + "create:" + CreateChatRoomRequestTo.class.getCanonicalName() + "," + + "message:" + ChatMessageTo.class.getCanonicalName()), false); return serializer; } @Bean - Consumer chatMessageChannelConsumer( + Consumer chatMessageChannelConsumer( Properties defaultConsumerProperties, ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer, - JsonDeserializer messageDeserializer) + JsonDeserializer messageDeserializer) { Map properties = new HashMap<>(); defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -224,13 +226,13 @@ public class KafkaServicesConfiguration } @Bean - JsonDeserializer chatMessageDeserializer() + JsonDeserializer chatMessageDeserializer() { - JsonDeserializer deserializer = new JsonDeserializer<>(); + JsonDeserializer deserializer = new JsonDeserializer<>(); deserializer.configure( Map.of( JsonDeserializer.USE_TYPE_INFO_HEADERS, false, - JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class, + JsonDeserializer.VALUE_DEFAULT_TYPE, ChatMessageTo.class, JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), false ); return deserializer; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java deleted file mode 100644 index 0a867f16..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java +++ /dev/null @@ -1,33 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.Message; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.time.LocalDateTime; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class MessageTo -{ - private String user; - private Long id; - private String text; - - public Message toMessage(long offset, LocalDateTime timestamp) - { - return new Message(Message.MessageKey.of(user, id), offset, timestamp, text); - } - - public static MessageTo from(Message message) - { - return - new MessageTo( - message.getUsername(), - message.getId(), - message.getMessageText()); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index fc2b7c89..4e9ad239 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -37,10 +38,17 @@ class KafkaConfigurationIT extends AbstractConfigurationIT { UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); int shard = shardingStrategy.selectShard(chatRoomId); - chatRoomTemplate.send(CHATROOMS_TOPIC, null,"{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }"); - messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }"); - messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }"); - messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }"); - messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }"); + send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }", "create"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "message"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "message"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "message"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "message"); + } + + static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) + { + ProducerRecord record = new ProducerRecord<>(MESSAGES_TOPIC, key, value); + record.headers().add("__TypeId__", typeId.getBytes()); + kafkaTemplate.send(record); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageToTest.java new file mode 100644 index 00000000..4a6c1c32 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageToTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class ChatMessageToTest +{ + final String json = """ + { + "id": 1, + "text": "Hallo, ich heiße Peter!", + "user": "Peter" + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + ChatMessageTo message = mapper.readValue(json, ChatMessageTo.class); + assertThat(message.getId()).isEqualTo(1l); + assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); + assertThat(message.getUser()).isEqualTo("Peter"); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoToTest.java new file mode 100644 index 00000000..6132eace --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoToTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class ChatRoomInfoToTest +{ + final String json = """ + { + "id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7", + "name": "Foo-Room!", + "shard": 666 + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + ChatRoomInfoTo message = mapper.readValue(json, ChatRoomInfoTo.class); + assertThat(message.getId()).isEqualTo("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); + assertThat(message.getName()).isEqualTo("Foo-Room!"); + assertThat(message.getShard()).isEqualTo(666); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestToTest.java new file mode 100644 index 00000000..e7b749c4 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestToTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class CreateChatRoomRequestToTest +{ + final String json = """ + { + "id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7", + "name": "Foo-Room!", + "shard": 666 + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + CreateChatRoomRequestTo message = mapper.readValue(json, CreateChatRoomRequestTo.class); + assertThat(message.getId()).isEqualTo("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); + assertThat(message.getName()).isEqualTo("Foo-Room!"); + assertThat(message.getShard()).isEqualTo(666); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java deleted file mode 100644 index 0c4884bf..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java +++ /dev/null @@ -1,39 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; - - -public class MessageToTest -{ - final String json = """ - { - "id": 1, - "text": "Hallo, ich heiße Peter!", - "user": "Peter" - }"""; - - ObjectMapper mapper; - - @BeforeEach - public void setUp() - { - mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - } - - @Test - public void testDeserialization() throws Exception - { - MessageTo message = mapper.readValue(json, MessageTo.class); - assertThat(message.getId()).isEqualTo(1l); - assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); - assertThat(message.getUser()).isEqualTo("Peter"); - } -}