{
private String clientIdPrefix;
private String bootstrapServers = ":9092";
- private String chatRoomChannelTopic = "message_channel";
+ private String infoChannelTopic = "info_channel";
+ private String dataChannelTopic = "data_channel";
private int numPartitions = 2;
}
import de.juplo.kafka.chat.backend.domain.*;
import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.*;
public class DataChannel implements Runnable, ConsumerRebalanceListener
{
private final String topic;
- private final Producer<String, AbstractMessageTo> producer;
- private final Consumer<String, AbstractMessageTo> consumer;
+ private final Producer<String, AbstractDataMessageTo> producer;
+ private final Consumer<String, AbstractDataMessageTo> consumer;
private final ZoneId zoneId;
private final int numShards;
private final int bufferSize;
public DataChannel(
String topic,
- Producer<String, AbstractMessageTo> producer,
- Consumer<String, AbstractMessageTo> consumer,
+ Producer<String, AbstractDataMessageTo> producer,
+ Consumer<String, AbstractDataMessageTo> consumer,
ZoneId zoneId,
int numShards,
int bufferSize,
Clock clock)
{
log.debug(
- "Creating ChatRoomChannel for topic {} with {} partitions",
+ "Creating DataChannel for topic {} with {} partitions",
topic,
numShards);
this.topic = topic;
UUID chatRoomId,
String name)
{
- CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
+ CommandCreateChatRoomToData createChatRoomRequestTo = CommandCreateChatRoomToData.of(name);
return Mono.create(sink ->
{
- ProducerRecord<String, AbstractMessageTo> record =
+ ProducerRecord<String, AbstractDataMessageTo> record =
new ProducerRecord<>(
topic,
chatRoomId.toString(),
ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
return Mono.create(sink ->
{
- ProducerRecord<String, AbstractMessageTo> record =
+ ProducerRecord<String, AbstractDataMessageTo> record =
new ProducerRecord<>(
topic,
null,
zdt.toEpochSecond(),
chatRoomId.toString(),
- EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
+ EventChatDataMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
producer.send(record, ((metadata, exception) ->
{
{
try
{
- ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<String, AbstractDataMessageTo> records = consumer.poll(Duration.ofMinutes(5));
log.info("Fetched {} messages", records.count());
if (loadInProgress)
log.info("Exiting normally");
}
- private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+ private void loadChatRoom(ConsumerRecords<String, AbstractDataMessageTo> records)
{
- for (ConsumerRecord<String, AbstractMessageTo> record : records)
+ for (ConsumerRecord<String, AbstractDataMessageTo> record : records)
{
UUID chatRoomId = UUID.fromString(record.key());
case COMMAND_CREATE_CHATROOM:
createChatRoom(
chatRoomId,
- (CommandCreateChatRoomTo) record.value(),
+ (CommandCreateChatRoomToData) record.value(),
record.partition());
break;
chatRoomId,
timestamp,
record.offset(),
- (EventChatMessageReceivedTo) record.value(),
+ (EventChatDataMessageReceivedTo) record.value(),
record.partition());
break;
private void createChatRoom(
UUID chatRoomId,
- CommandCreateChatRoomTo createChatRoomRequestTo,
+ CommandCreateChatRoomToData createChatRoomRequestTo,
Integer partition)
{
log.info(
UUID chatRoomId,
LocalDateTime timestamp,
long offset,
- EventChatMessageReceivedTo chatMessageTo,
+ EventChatDataMessageReceivedTo chatMessageTo,
int partition)
{
Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
Integer partition,
ChatRoomData chatRoomData)
{
- if (this.chatRoomInfo[partition].containsKey(chatRoomId))
+ if (this.chatRoomData[partition].containsKey(chatRoomId))
{
log.warn(
"Ignoring existing chat-room for {}: {}",
partition,
chatRoomData);
- this.chatRoomInfo[partition].put(
- chatRoomId,
- new ChatRoomInfo(chatRoomId, name, partition));
this.chatRoomData[partition].put(chatRoomId, chatRoomData);
}
}
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
-import lombok.Getter;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
@Slf4j
-public class InfoChannel implements Runnable, ConsumerRebalanceListener
+public class InfoChannel implements Runnable
{
- private final String topic;
- private final Producer<String, AbstractMessageTo> producer;
- private final Consumer<String, AbstractMessageTo> consumer;
+ private final DataChannel dataChannel;
+ private final String infoTopic;
+ private final String dataTopic;
+ private final Producer<String, AbstractDataMessageTo> producer;
+ private final Consumer<String, AbstractDataMessageTo> consumer;
private final ZoneId zoneId;
- private final int numShards;
private final int bufferSize;
private final Clock clock;
- private final boolean[] isShardOwned;
- private final long[] currentOffset;
- private final long[] nextOffset;
- private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
- private final Map<UUID, ChatRoomData>[] chatRoomData;
+ private final Map<UUID, ChatRoomInfo> chatRoomInfo;
private boolean running;
- @Getter
- private volatile boolean loadInProgress;
public InfoChannel(
- String topic,
- Producer<String, AbstractMessageTo> producer,
- Consumer<String, AbstractMessageTo> consumer,
+ DataChannel dataChannel,
+ String infoTopic,
+ String dataTopic,
+ Producer<String, AbstractDataMessageTo> producer,
+ Consumer<String, AbstractDataMessageTo> consumer,
ZoneId zoneId,
- int numShards,
int bufferSize,
Clock clock)
{
log.debug(
- "Creating ChatRoomChannel for topic {} with {} partitions",
- topic,
- numShards);
- this.topic = topic;
+ "Creating InfoChannel for topic {}",
+ infoTopic);
+ this.dataChannel = dataChannel;
+ this.infoTopic = infoTopic;
+ this.dataTopic = dataTopic;
this.consumer = consumer;
this.producer = producer;
this.zoneId = zoneId;
- this.numShards = numShards;
this.bufferSize = bufferSize;
this.clock = clock;
- this.isShardOwned = new boolean[numShards];
- this.currentOffset = new long[numShards];
- this.nextOffset = new long[numShards];
- this.chatRoomInfo = new Map[numShards];
- this.chatRoomData = new Map[numShards];
- IntStream
- .range(0, numShards)
- .forEach(shard ->
- {
- this.chatRoomInfo[shard] = new HashMap<>();
- this.chatRoomData[shard] = new HashMap<>();
- });
+ this.chatRoomInfo = new HashMap<>();
}
UUID chatRoomId,
String name)
{
- CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
+ CommandCreateChatRoomToData createChatRoomRequestTo = CommandCreateChatRoomToData.of(name);
return Mono.create(sink ->
{
- ProducerRecord<String, AbstractMessageTo> record =
+ ProducerRecord<String, AbstractDataMessageTo> record =
new ProducerRecord<>(
- topic,
+ dataTopic,
chatRoomId.toString(),
createChatRoomRequestTo);
ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
return Mono.create(sink ->
{
- ProducerRecord<String, AbstractMessageTo> record =
+ ProducerRecord<String, AbstractDataMessageTo> record =
new ProducerRecord<>(
- topic,
+ infoTopic,
null,
zdt.toEpochSecond(),
chatRoomId.toString(),
- EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
+ EventChatDataMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
producer.send(record, ((metadata, exception) ->
{
{
try
{
- ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<String, AbstractDataMessageTo> records = consumer.poll(Duration.ofMinutes(5));
log.info("Fetched {} messages", records.count());
if (loadInProgress)
log.info("Exiting normally");
}
- private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+ private void loadChatRoom(ConsumerRecords<String, AbstractDataMessageTo> records)
{
- for (ConsumerRecord<String, AbstractMessageTo> record : records)
+ for (ConsumerRecord<String, AbstractDataMessageTo> record : records)
{
UUID chatRoomId = UUID.fromString(record.key());
case COMMAND_CREATE_CHATROOM:
createChatRoom(
chatRoomId,
- (CommandCreateChatRoomTo) record.value(),
+ (CommandCreateChatRoomToData) record.value(),
record.partition());
break;
chatRoomId,
timestamp,
record.offset(),
- (EventChatMessageReceivedTo) record.value(),
+ (EventChatDataMessageReceivedTo) record.value(),
record.partition());
break;
private void createChatRoom(
UUID chatRoomId,
- CommandCreateChatRoomTo createChatRoomRequestTo,
+ CommandCreateChatRoomToData createChatRoomRequestTo,
Integer partition)
{
log.info(
UUID chatRoomId,
LocalDateTime timestamp,
long offset,
- EventChatMessageReceivedTo chatMessageTo,
+ EventChatDataMessageReceivedTo chatMessageTo,
int partition)
{
Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
consumer.pause(IntStream
.range(0, numShards)
.filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> new TopicPartition(topic, shard))
+ .mapToObj(shard -> new TopicPartition(infoTopic, shard))
.toList());
}
public class KafkaChatHomeService implements ChatHomeService
{
private final int numPartitions;
- private final ChatRoomChannel chatRoomChannel;
+ private final InfoChannel infoChannel;
+ private final DataChannel dataChannel;
public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
log.info("Sending create-command for chat rooom: id={}, name={}");
- return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+ return infoChannel.sendCreateChatRoomRequest(id, name);
}
@Override
public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
int shard = selectShard(id);
- return chatRoomChannel
+ return infoChannel
.getChatRoomInfo(shard, id)
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
id,
shard,
- chatRoomChannel.getOwnedShards())));
+ dataChannel.getOwnedShards())));
}
@Override
public Flux<ChatRoomInfo> getChatRoomInfo()
{
- return chatRoomChannel.getChatRoomInfo();
+ return infoChannel.getChatRoomInfo();
}
@Override
public Mono<ChatRoomData> getChatRoomData(UUID id)
{
int shard = selectShard(id);
- return chatRoomChannel
+ return dataChannel
.getChatRoomData(shard, id)
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
id,
shard,
- chatRoomChannel.getOwnedShards())));
+ dataChannel.getOwnedShards())));
}
int selectShard(UUID chatRoomId)
@Slf4j
public class KafkaChatMessageService implements ChatMessageService
{
- private final ChatRoomChannel chatRoomChannel;
+ private final DataChannel dataChannel;
private final UUID chatRoomId;
private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
LocalDateTime timestamp,
String text)
{
- return chatRoomChannel
+ return dataChannel
.sendChatMessage(chatRoomId, key, timestamp, text)
.doOnSuccess(message -> persistMessage(message));
}
package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
public class KafkaServicesApplicationRunner implements ApplicationRunner
{
private final ThreadPoolTaskExecutor taskExecutor;
- private final ChatRoomChannel chatRoomChannel;
+ private final DataChannel dataChannel;
private final Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
private final WorkAssignor workAssignor;
workAssignor.assignWork(chatRoomChannelConsumer);
log.info("Starting the consumer for the ChatRoomChannel");
chatRoomChannelConsumerJob = taskExecutor
- .submitCompletable(chatRoomChannel)
+ .submitCompletable(dataChannel)
.exceptionally(e ->
{
log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@Bean
ChatHomeService kafkaChatHome(
ChatBackendProperties properties,
- ChatRoomChannel chatRoomChannel)
+ DataChannel dataChannel)
{
return new KafkaChatHomeService(
properties.getKafka().getNumPartitions(),
- chatRoomChannel);
+ dataChannel);
}
@Bean
- ChatRoomChannel chatRoomChannel(
+ DataChannel chatRoomChannel(
ChatBackendProperties properties,
- Producer<String, AbstractMessageTo> chatRoomChannelProducer,
- Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+ Producer<String, AbstractDataMessageTo> chatRoomChannelProducer,
+ Consumer<String, AbstractDataMessageTo> chatRoomChannelConsumer,
ZoneId zoneId,
Clock clock)
{
- return new ChatRoomChannel(
- properties.getKafka().getChatRoomChannelTopic(),
+ return new DataChannel(
+ properties.getKafka().getDataChannelTopic(),
chatRoomChannelProducer,
chatRoomChannelConsumer,
zoneId,
}
@Bean
- Producer<String, AbstractMessageTo> chatRoomChannelProducer(
+ Producer<String, AbstractDataMessageTo> chatRoomChannelProducer(
Properties defaultProducerProperties,
ChatBackendProperties chatBackendProperties,
StringSerializer stringSerializer,
- JsonSerializer<AbstractMessageTo> messageSerializer)
+ JsonSerializer<AbstractDataMessageTo> messageSerializer)
{
Map<String, Object> properties = new HashMap<>();
defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
}
@Bean
- JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
+ JsonSerializer<AbstractDataMessageTo> chatMessageSerializer(String typeMappings)
{
- JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
+ JsonSerializer<AbstractDataMessageTo> serializer = new JsonSerializer<>();
serializer.configure(
Map.of(
JsonSerializer.TYPE_MAPPINGS, typeMappings),
}
@Bean
- Consumer<String, AbstractMessageTo> chatRoomChannelConsumer(
+ Consumer<String, AbstractDataMessageTo> chatRoomChannelConsumer(
Properties defaultConsumerProperties,
ChatBackendProperties chatBackendProperties,
StringDeserializer stringDeserializer,
- JsonDeserializer<AbstractMessageTo> messageDeserializer)
+ JsonDeserializer<AbstractDataMessageTo> messageDeserializer)
{
Map<String, Object> properties = new HashMap<>();
defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
}
@Bean
- JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
+ JsonDeserializer<AbstractDataMessageTo> chatMessageDeserializer(String typeMappings)
{
- JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
+ JsonDeserializer<AbstractDataMessageTo> deserializer = new JsonDeserializer<>();
deserializer.configure(
Map.of(
JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
String typeMappings ()
{
return
- "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," +
- "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
+ "command_create_chatroom:" + CommandCreateChatRoomToData.class.getCanonicalName() + "," +
+ "event_chatmessage_received:" + EventChatDataMessageReceivedTo.class.getCanonicalName();
}
@Bean
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
import lombok.Getter;
@RequiredArgsConstructor
-public class AbstractMessageTo
+public class AbstractDataMessageTo
{
public enum ToType {
COMMAND_CREATE_CHATROOM,
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
import lombok.*;
@Setter
@EqualsAndHashCode
@ToString
-public class CommandCreateChatRoomTo extends AbstractMessageTo
+public class CommandCreateChatRoomToData extends AbstractDataMessageTo
{
private String name;
- public CommandCreateChatRoomTo()
+ public CommandCreateChatRoomToData()
{
super(ToType.COMMAND_CREATE_CHATROOM);
}
- public static CommandCreateChatRoomTo of(String name)
+ public static CommandCreateChatRoomToData of(String name)
{
- CommandCreateChatRoomTo to = new CommandCreateChatRoomTo();
+ CommandCreateChatRoomToData to = new CommandCreateChatRoomToData();
to.name = name;
return to;
}
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
import lombok.*;
@Setter
@EqualsAndHashCode
@ToString
-public class EventChatMessageReceivedTo extends AbstractMessageTo
+public class EventChatDataMessageReceivedTo extends AbstractDataMessageTo
{
private String user;
private Long id;
private String text;
- public EventChatMessageReceivedTo()
+ public EventChatDataMessageReceivedTo()
{
super(ToType.EVENT_CHATMESSAGE_RECEIVED);
}
- public static EventChatMessageReceivedTo of(String user, Long id, String text)
+ public static EventChatDataMessageReceivedTo of(String user, Long id, String text)
{
- EventChatMessageReceivedTo to = new EventChatMessageReceivedTo();
+ EventChatDataMessageReceivedTo to = new EventChatDataMessageReceivedTo();
to.user = user;
to.id = id;
to.text = text;
-package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;public class EventChatRoomCreated {
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class EventChatRoomCreated extends AbstractInfoMessageTo
+{
+ private String id;
+ private String name;
+ private Integer shard;
+
+
+ public EventChatRoomCreated(ToType type)
+ {
+ super(type);
+ }
}
@Getter(AccessLevel.PACKAGE)
@Setter(AccessLevel.PACKAGE)
@EqualsAndHashCode(of = { "id" })
-@ToString(of = { "id", "shard", "name" })
+@ToString(of = { "id", "name" })
@Document
public class ChatRoomTo
{
@Id
private String id;
- private Integer shard;
private String name;
public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
{
return new ChatRoomTo(
chatRoomInfo.getId().toString(),
- chatRoomInfo.getShard(),
chatRoomInfo.getName());
}
}
package de.juplo.kafka.chat.backend;
-import de.juplo.kafka.chat.backend.implementation.kafka.ChatRoomChannel;
+import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel;
import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesApplicationRunner;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
@SpringBootTest(
"chat.backend.kafka.client-id-PREFIX=TEST",
"chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
+ "chat.backend.kafka.info-topic=" + INFO_TOPIC,
+ "chat.backend.kafka.data-topic=" + DATA_TOPIC,
"chat.backend.kafka.num-partitions=10",
})
-@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@EmbeddedKafka(
+ topics = { INFO_TOPIC, DATA_TOPIC },
+ partitions = 10)
@Slf4j
class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
{
- final static String TOPIC = "KAFKA_CONFIGURATION_IT";
+ final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
+ final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
static CompletableFuture<Void> CONSUMER_JOB;
@Autowired KafkaTemplate<String, String> messageTemplate,
@Autowired Consumer chatRoomChannelConsumer,
@Autowired ThreadPoolTaskExecutor taskExecutor,
- @Autowired ChatRoomChannel chatRoomChannel)
+ @Autowired DataChannel dataChannel)
{
- send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
+ send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"name\": \"FOO\" }", "command_create_chatroom");
send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
- List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
+ List<TopicPartition> assignedPartitions = List.of(new TopicPartition(INFO_TOPIC, 2));
chatRoomChannelConsumer.assign(assignedPartitions);
- chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+ dataChannel.onPartitionsAssigned(assignedPartitions);
CONSUMER_JOB = taskExecutor
- .submitCompletable(chatRoomChannel)
+ .submitCompletable(dataChannel)
.exceptionally(e ->
{
log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
{
- ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+ ProducerRecord<String, String> record = new ProducerRecord<>(INFO_TOPIC, key, value);
record.headers().add("__TypeId__", typeId.getBytes());
SendResult<String, String> result = kafkaTemplate.send(record).join();
log.info(
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
@Test
public void testDeserialization() throws Exception
{
- CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class);
+ CommandCreateChatRoomToData message = mapper.readValue(json, CommandCreateChatRoomToData.class);
assertThat(message.getName()).isEqualTo("Foo-Room!");
}
}
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
@Test
public void testDeserialization() throws Exception
{
- EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class);
+ EventChatDataMessageReceivedTo message = mapper.readValue(json, EventChatDataMessageReceivedTo.class);
assertThat(message.getId()).isEqualTo(1l);
assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
assertThat(message.getUser()).isEqualTo("Peter");