import de.juplo.kafka.chat.backend.domain.*;
import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
import lombok.Getter;
public class DataChannel implements Runnable, ConsumerRebalanceListener
{
private final String topic;
- private final Producer<String, AbstractDataMessageTo> producer;
- private final Consumer<String, AbstractDataMessageTo> consumer;
+ private final Producer<String, AbstractMessageTo> producer;
+ private final Consumer<String, AbstractMessageTo> consumer;
private final ZoneId zoneId;
private final int numShards;
private final int bufferSize;
public DataChannel(
String topic,
- Producer<String, AbstractDataMessageTo> producer,
- Consumer<String, AbstractDataMessageTo> consumer,
+ Producer<String, AbstractMessageTo> producer,
+ Consumer<String, AbstractMessageTo> dataChannelConsumer,
ZoneId zoneId,
int numShards,
int bufferSize,
topic,
numShards);
this.topic = topic;
- this.consumer = consumer;
+ this.consumer = dataChannelConsumer;
this.producer = producer;
this.zoneId = zoneId;
this.numShards = numShards;
CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
return Mono.create(sink ->
{
- ProducerRecord<String, AbstractDataMessageTo> record =
+ ProducerRecord<String, AbstractMessageTo> record =
new ProducerRecord<>(
topic,
chatRoomId.toString(),
ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
return Mono.create(sink ->
{
- ProducerRecord<String, AbstractDataMessageTo> record =
+ ProducerRecord<String, AbstractMessageTo> record =
new ProducerRecord<>(
topic,
null,
{
try
{
- ConsumerRecords<String, AbstractDataMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
log.info("Fetched {} messages", records.count());
if (loadInProgress)
log.info("Exiting normally");
}
- private void loadChatRoom(ConsumerRecords<String, AbstractDataMessageTo> records)
+ private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
{
- for (ConsumerRecord<String, AbstractDataMessageTo> record : records)
+ for (ConsumerRecord<String, AbstractMessageTo> record : records)
{
UUID chatRoomId = UUID.fromString(record.key());
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import java.time.*;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
{
private final String infoTopic;
private final String dataTopic;
- private final Producer<String, AbstractDataMessageTo> producer;
- private final Consumer<String, AbstractDataMessageTo> consumer;
+ private final Producer<String, AbstractMessageTo> producer;
+ private final Consumer<String, AbstractMessageTo> consumer;
private final Map<UUID, ChatRoomInfo> chatRoomInfo;
private boolean running;
public InfoChannel(
String infoTopic,
String dataTopic,
- Producer<String, AbstractDataMessageTo> producer,
- Consumer<String, AbstractDataMessageTo> consumer,
- ZoneId zoneId)
+ Producer<String, AbstractMessageTo> producer,
+ Consumer<String, AbstractMessageTo> infoChannelConsumer)
{
log.debug(
"Creating InfoChannel for topic {}",
infoTopic);
this.infoTopic = infoTopic;
this.dataTopic = dataTopic;
- this.consumer = consumer;
+ this.consumer = infoChannelConsumer;
this.producer = producer;
this.chatRoomInfo = new HashMap<>();
}
CommandCreateChatRoomTo to = CommandCreateChatRoomTo.of(name);
return Mono.create(sink ->
{
- ProducerRecord<String, AbstractDataMessageTo> record =
+ ProducerRecord<String, AbstractMessageTo> record =
new ProducerRecord<>(
dataTopic,
chatRoomId.toString(),
{
try
{
- ConsumerRecords<String, AbstractDataMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
log.info("Fetched {} messages", records.count());
if (loadInProgress)
package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
import org.apache.kafka.clients.consumer.Consumer;
@Bean
DataChannel chatRoomChannel(
ChatBackendProperties properties,
- Producer<String, AbstractDataMessageTo> chatRoomChannelProducer,
- Consumer<String, AbstractDataMessageTo> chatRoomChannelConsumer,
+ Producer<String, AbstractMessageTo> chatRoomChannelProducer,
+ Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
ZoneId zoneId,
Clock clock)
{
}
@Bean
- Producer<String, AbstractDataMessageTo> chatRoomChannelProducer(
+ Producer<String, AbstractMessageTo> chatRoomChannelProducer(
Properties defaultProducerProperties,
ChatBackendProperties chatBackendProperties,
StringSerializer stringSerializer,
- JsonSerializer<AbstractDataMessageTo> messageSerializer)
+ JsonSerializer<AbstractMessageTo> messageSerializer)
{
Map<String, Object> properties = new HashMap<>();
defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
}
@Bean
- JsonSerializer<AbstractDataMessageTo> chatMessageSerializer(String typeMappings)
+ JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
{
- JsonSerializer<AbstractDataMessageTo> serializer = new JsonSerializer<>();
+ JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
serializer.configure(
Map.of(
JsonSerializer.TYPE_MAPPINGS, typeMappings),
}
@Bean
- Consumer<String, AbstractDataMessageTo> chatRoomChannelConsumer(
+ Consumer<String, AbstractMessageTo> chatRoomChannelConsumer(
Properties defaultConsumerProperties,
ChatBackendProperties chatBackendProperties,
StringDeserializer stringDeserializer,
- JsonDeserializer<AbstractDataMessageTo> messageDeserializer)
+ JsonDeserializer<AbstractMessageTo> messageDeserializer)
{
Map<String, Object> properties = new HashMap<>();
defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
}
@Bean
- JsonDeserializer<AbstractDataMessageTo> chatMessageDeserializer(String typeMappings)
+ JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
{
- JsonDeserializer<AbstractDataMessageTo> deserializer = new JsonDeserializer<>();
+ JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
deserializer.configure(
Map.of(
JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
-package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages;
import lombok.Getter;
@RequiredArgsConstructor
-public class AbstractDataMessageTo
+public class AbstractMessageTo
{
public enum ToType {
COMMAND_CREATE_CHATROOM,
EVENT_CHATMESSAGE_RECEIVED,
+ EVENT_CHATROOM_CREATED,
}
@Getter
package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import lombok.*;
@Setter
@EqualsAndHashCode
@ToString
-public class CommandCreateChatRoomTo extends AbstractDataMessageTo
+public class CommandCreateChatRoomTo extends AbstractMessageTo
{
private String name;
package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import lombok.*;
@Setter
@EqualsAndHashCode
@ToString
-public class EventChatMessageReceivedTo extends AbstractDataMessageTo
+public class EventChatMessageReceivedTo extends AbstractMessageTo
{
private String user;
private Long id;
package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
@Setter
@EqualsAndHashCode
@ToString
-public class EventChatRoomCreated extends AbstractInfoMessageTo
+public class EventChatRoomCreated extends AbstractMessageTo
{
private String id;
private String name;