{
private String clientIdPrefix;
private String bootstrapServers = ":9092";
- private String chatRoomChannelTopic = "message_channel";
+ private String infoChannelTopic = "info_channel";
+ private String dataChannelTopic = "data_channel";
private int numPartitions = 2;
}
private final String name;
@Getter
private final Integer shard;
+
+
+ public ChatRoomInfo(UUID id, String name)
+ {
+ this(id, name, null);
+ }
}
}
- interface WorkAssignor
+ public interface WorkAssignor
{
void assignWork(Consumer<?, ?> consumer);
}
--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ConsumerTaskRunner
+{
+ private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
+ private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
+
+ public void run()
+ {
+ infoChannelConsumerTaskExecutor.executeConsumerTask();
+ dataChannelConsumerTaskExecutor.executeConsumerTask();
+ }
+
+ public void joinConsumerTasks()
+ {
+ dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
+ infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
+ }
+}
import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.*;
@Slf4j
-public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
+public class DataChannel implements Runnable, ConsumerRebalanceListener
{
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
private final boolean[] isShardOwned;
private final long[] currentOffset;
private final long[] nextOffset;
- private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
private final Map<UUID, ChatRoomData>[] chatRoomData;
private boolean running;
private volatile boolean loadInProgress;
- public ChatRoomChannel(
+ public DataChannel(
String topic,
Producer<String, AbstractMessageTo> producer,
- Consumer<String, AbstractMessageTo> consumer,
+ Consumer<String, AbstractMessageTo> dataChannelConsumer,
ZoneId zoneId,
int numShards,
int bufferSize,
Clock clock)
{
log.debug(
- "Creating ChatRoomChannel for topic {} with {} partitions",
+ "Creating DataChannel for topic {} with {} partitions",
topic,
numShards);
this.topic = topic;
- this.consumer = consumer;
+ this.consumer = dataChannelConsumer;
this.producer = producer;
this.zoneId = zoneId;
this.numShards = numShards;
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
- this.chatRoomInfo = new Map[numShards];
this.chatRoomData = new Map[numShards];
IntStream
.range(0, numShards)
.forEach(shard ->
{
- this.chatRoomInfo[shard] = new HashMap<>();
this.chatRoomData[shard] = new HashMap<>();
});
}
- Mono<ChatRoomInfo> sendCreateChatRoomRequest(
- UUID chatRoomId,
- String name)
- {
- CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
- return Mono.create(sink ->
- {
- ProducerRecord<String, AbstractMessageTo> record =
- new ProducerRecord<>(
- topic,
- chatRoomId.toString(),
- createChatRoomRequestTo);
-
- producer.send(record, ((metadata, exception) ->
- {
- if (metadata != null)
- {
- log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
- ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
- createChatRoom(chatRoomInfo);
- sink.success(chatRoomInfo);
- }
- else
- {
- // On send-failure
- log.error(
- "Could not send create-request for chat room (id={}, name={}): {}",
- chatRoomId,
- name,
- exception);
- sink.error(exception);
- }
- }));
- });
- }
-
Mono<Message> sendChatMessage(
UUID chatRoomId,
Message.MessageKey key,
{
try
{
- ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
log.info("Fetched {} messages", records.count());
if (loadInProgress)
{
- loadChatRoom(records);
+ loadChatRoomData(records);
if (isLoadingCompleted())
{
log.info("Exiting normally");
}
- private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+ private void loadChatRoomData(ConsumerRecords<String, AbstractMessageTo> records)
{
for (ConsumerRecord<String, AbstractMessageTo> record : records)
{
switch (record.value().getType())
{
- case COMMAND_CREATE_CHATROOM:
- createChatRoom(
- chatRoomId,
- (CommandCreateChatRoomTo) record.value(),
- record.partition());
- break;
-
case EVENT_CHATMESSAGE_RECEIVED:
Instant instant = Instant.ofEpochSecond(record.timestamp());
LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
}
}
- private void createChatRoom(
- UUID chatRoomId,
- CommandCreateChatRoomTo createChatRoomRequestTo,
- Integer partition)
- {
- log.info(
- "Loading ChatRoom {} for shard {} with buffer-size {}",
- chatRoomId,
- partition,
- bufferSize);
- KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
- ChatRoomData chatRoomData = new ChatRoomData(
- clock,
- service,
- bufferSize);
- putChatRoom(
- chatRoomId,
- createChatRoomRequestTo.getName(),
- partition,
- chatRoomData);
- }
-
-
- private void createChatRoom(ChatRoomInfo chatRoomInfo)
- {
- UUID id = chatRoomInfo.getId();
- log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
- KafkaChatMessageService service = new KafkaChatMessageService(this, id);
- ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
- putChatRoom(
- chatRoomInfo.getId(),
- chatRoomInfo.getName(),
- chatRoomInfo.getShard(),
- chatRoomData);
- }
-
private void loadChatMessage(
UUID chatRoomId,
LocalDateTime timestamp,
Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
- ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
+ ChatRoomData chatRoomData = this.chatRoomData[partition].computeIfAbsent(
+ chatRoomId,
+ (id) ->
+ {
+ log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
+ KafkaChatMessageService service = new KafkaChatMessageService(this, id);
+ return new ChatRoomData(clock, service, bufferSize);
+ });
KafkaChatMessageService kafkaChatRoomService =
(KafkaChatMessageService) chatRoomData.getChatRoomService();
}
- private void putChatRoom(
- UUID chatRoomId,
- String name,
- Integer partition,
- ChatRoomData chatRoomData)
- {
- if (this.chatRoomInfo[partition].containsKey(chatRoomId))
- {
- log.warn(
- "Ignoring existing chat-room for {}: {}",
- partition,
- chatRoomId);
- }
- else
- {
- log.info(
- "Adding new chat-room to partition {}: {}",
- partition,
- chatRoomData);
-
- this.chatRoomInfo[partition].put(
- chatRoomId,
- new ChatRoomInfo(chatRoomId, name, partition));
- this.chatRoomData[partition].put(chatRoomId, chatRoomData);
- }
- }
-
int[] getOwnedShards()
{
return IntStream
return Mono.justOrEmpty(chatRoomData[shard].get(id));
}
-
- Flux<ChatRoomInfo> getChatRoomInfo()
- {
- return Flux
- .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
- .filter(shard -> isShardOwned[shard])
- .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
- }
-
- Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
- {
- if (loadInProgress)
- {
- return Mono.error(new LoadInProgressException());
- }
-
- if (!isShardOwned[shard])
- {
- return Mono.error(new ShardNotOwnedException(shard));
- }
-
- return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
- }
}
package de.juplo.kafka.chat.backend.implementation.kafka;
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
-import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
-import lombok.Getter;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.*;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
import java.util.stream.IntStream;
@Slf4j
-public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
+public class InfoChannel implements Runnable
{
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
private final Consumer<String, AbstractMessageTo> consumer;
- private final ZoneId zoneId;
private final int numShards;
- private final int bufferSize;
- private final Clock clock;
- private final boolean[] isShardOwned;
private final long[] currentOffset;
private final long[] nextOffset;
- private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
- private final Map<UUID, ChatRoomData>[] chatRoomData;
+ private final Map<UUID, ChatRoomInfo> chatRoomInfo;
private boolean running;
- @Getter
- private volatile boolean loadInProgress;
- public ChatRoomChannel(
+ public InfoChannel(
String topic,
Producer<String, AbstractMessageTo> producer,
- Consumer<String, AbstractMessageTo> consumer,
- ZoneId zoneId,
- int numShards,
- int bufferSize,
- Clock clock)
+ Consumer<String, AbstractMessageTo> infoChannelConsumer)
{
log.debug(
- "Creating ChatRoomChannel for topic {} with {} partitions",
- topic,
- numShards);
+ "Creating InfoChannel for topic {}",
+ topic);
this.topic = topic;
- this.consumer = consumer;
+ this.consumer = infoChannelConsumer;
this.producer = producer;
- this.zoneId = zoneId;
- this.numShards = numShards;
- this.bufferSize = bufferSize;
- this.clock = clock;
- this.isShardOwned = new boolean[numShards];
+ this.chatRoomInfo = new HashMap<>();
+
+ this.numShards = consumer
+ .partitionsFor(topic)
+ .size();
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
- this.chatRoomInfo = new Map[numShards];
- this.chatRoomData = new Map[numShards];
IntStream
.range(0, numShards)
- .forEach(shard ->
- {
- this.chatRoomInfo[shard] = new HashMap<>();
- this.chatRoomData[shard] = new HashMap<>();
- });
+ .forEach(partition -> this.nextOffset[partition] = -1l);
}
+ boolean loadInProgress()
+ {
+ return IntStream
+ .range(0, numShards)
+ .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
+ }
- Mono<ChatRoomInfo> sendCreateChatRoomRequest(
+ Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
UUID chatRoomId,
- String name)
+ String name,
+ int shard)
{
- CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
+ EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
return Mono.create(sink ->
{
ProducerRecord<String, AbstractMessageTo> record =
new ProducerRecord<>(
topic,
- chatRoomId.toString(),
- createChatRoomRequestTo);
+ Integer.toString(shard),
+ to);
producer.send(record, ((metadata, exception) ->
{
if (metadata != null)
{
- log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+ log.info("Successfully sent chreate-request for chat room: {}", to);
ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
- createChatRoom(chatRoomInfo);
sink.success(chatRoomInfo);
}
else
});
}
- Mono<Message> sendChatMessage(
- UUID chatRoomId,
- Message.MessageKey key,
- LocalDateTime timestamp,
- String text)
- {
- ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
- return Mono.create(sink ->
- {
- ProducerRecord<String, AbstractMessageTo> record =
- new ProducerRecord<>(
- topic,
- null,
- zdt.toEpochSecond(),
- chatRoomId.toString(),
- EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
-
- producer.send(record, ((metadata, exception) ->
- {
- if (metadata != null)
- {
- // On successful send
- Message message = new Message(key, metadata.offset(), timestamp, text);
- log.info("Successfully send message {}", message);
- sink.success(message);
- }
- else
- {
- // On send-failure
- log.error(
- "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
- chatRoomId,
- key,
- timestamp,
- text,
- exception);
- sink.error(exception);
- }
- }));
- });
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- log.info("Newly assigned partitions! Pausing normal operations...");
- loadInProgress = true;
-
- consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
- {
- int partition = topicPartition.partition();
- isShardOwned[partition] = true;
- this.currentOffset[partition] = currentOffset;
-
- log.info(
- "Partition assigned: {} - loading messages: next={} -> current={}",
- partition,
- nextOffset[partition],
- currentOffset);
-
- consumer.seek(topicPartition, nextOffset[partition]);
- });
-
- consumer.resume(partitions);
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- partitions.forEach(topicPartition ->
- {
- int partition = topicPartition.partition();
- isShardOwned[partition] = false;
- log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
- });
- }
-
- @Override
- public void onPartitionsLost(Collection<TopicPartition> partitions)
- {
- log.warn("Lost partitions: {}, partitions");
- // TODO: Muss auf den Verlust anders reagiert werden?
- onPartitionsRevoked(partitions);
- }
@Override
public void run()
{
running = true;
+ consumer
+ .endOffsets(consumer.assignment())
+ .entrySet()
+ .stream()
+ .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
+ IntStream
+ .range(0, numShards)
+ .forEach(partition -> this.nextOffset[partition] = 0l);
+
while (running)
{
try
{
- ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
- log.info("Fetched {} messages", records.count());
-
- if (loadInProgress)
- {
- loadChatRoom(records);
-
- if (isLoadingCompleted())
- {
- log.info("Loading of messages completed! Pausing all owned partitions...");
- pauseAllOwnedPartions();
- log.info("Resuming normal operations...");
- loadInProgress = false;
- }
- }
- else
- {
- if (!records.isEmpty())
- {
- throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
- }
- }
+ ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
+ log.debug("Fetched {} messages", records.count());
+ handleMessages(records);
}
catch (WakeupException e)
{
log.info("Exiting normally");
}
- private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+ private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
{
for (ConsumerRecord<String, AbstractMessageTo> record : records)
{
- UUID chatRoomId = UUID.fromString(record.key());
-
switch (record.value().getType())
{
- case COMMAND_CREATE_CHATROOM:
- createChatRoom(
- chatRoomId,
- (CommandCreateChatRoomTo) record.value(),
- record.partition());
- break;
-
- case EVENT_CHATMESSAGE_RECEIVED:
- Instant instant = Instant.ofEpochSecond(record.timestamp());
- LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
- loadChatMessage(
- chatRoomId,
- timestamp,
- record.offset(),
- (EventChatMessageReceivedTo) record.value(),
- record.partition());
+ case EVENT_CHATROOM_CREATED:
+ EventChatRoomCreated eventChatRoomCreated =
+ (EventChatRoomCreated) record.value();
+ createChatRoom(eventChatRoomCreated.toChatRoomInfo());
break;
default:
log.debug(
- "Ignoring message for chat-room {} with offset {}: {}",
- chatRoomId,
+ "Ignoring message for key={} with offset={}: {}",
+ record.key(),
record.offset(),
record.value());
}
}
}
- private void createChatRoom(
- UUID chatRoomId,
- CommandCreateChatRoomTo createChatRoomRequestTo,
- Integer partition)
- {
- log.info(
- "Loading ChatRoom {} for shard {} with buffer-size {}",
- chatRoomId,
- partition,
- bufferSize);
- KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
- ChatRoomData chatRoomData = new ChatRoomData(
- clock,
- service,
- bufferSize);
- putChatRoom(
- chatRoomId,
- createChatRoomRequestTo.getName(),
- partition,
- chatRoomData);
- }
-
-
private void createChatRoom(ChatRoomInfo chatRoomInfo)
{
- UUID id = chatRoomInfo.getId();
- log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
- KafkaChatMessageService service = new KafkaChatMessageService(this, id);
- ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
- putChatRoom(
- chatRoomInfo.getId(),
- chatRoomInfo.getName(),
- chatRoomInfo.getShard(),
- chatRoomData);
- }
-
- private void loadChatMessage(
- UUID chatRoomId,
- LocalDateTime timestamp,
- long offset,
- EventChatMessageReceivedTo chatMessageTo,
- int partition)
- {
- Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
- Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
-
- ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
- KafkaChatMessageService kafkaChatRoomService =
- (KafkaChatMessageService) chatRoomData.getChatRoomService();
-
- kafkaChatRoomService.persistMessage(message);
- }
-
- private boolean isLoadingCompleted()
- {
- return IntStream
- .range(0, numShards)
- .filter(shard -> isShardOwned[shard])
- .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
- }
-
- private void pauseAllOwnedPartions()
- {
- consumer.pause(IntStream
- .range(0, numShards)
- .filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> new TopicPartition(topic, shard))
- .toList());
- }
+ UUID chatRoomId = chatRoomInfo.getId();
+ Integer partition = chatRoomInfo.getShard();
-
- private void putChatRoom(
- UUID chatRoomId,
- String name,
- Integer partition,
- ChatRoomData chatRoomData)
- {
- if (this.chatRoomInfo[partition].containsKey(chatRoomId))
+ if (this.chatRoomInfo.containsKey(chatRoomId))
{
log.warn(
"Ignoring existing chat-room for {}: {}",
else
{
log.info(
- "Adding new chat-room to partition {}: {}",
+ "Adding new chat-room for partition {}: {}",
partition,
- chatRoomData);
-
- this.chatRoomInfo[partition].put(
- chatRoomId,
- new ChatRoomInfo(chatRoomId, name, partition));
- this.chatRoomData[partition].put(chatRoomId, chatRoomData);
- }
- }
-
- int[] getOwnedShards()
- {
- return IntStream
- .range(0, numShards)
- .filter(shard -> isShardOwned[shard])
- .toArray();
- }
-
- Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
- {
- if (loadInProgress)
- {
- return Mono.error(new LoadInProgressException());
- }
+ chatRoomId);
- if (!isShardOwned[shard])
- {
- return Mono.error(new ShardNotOwnedException(shard));
+ this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
}
-
- return Mono.justOrEmpty(chatRoomData[shard].get(id));
}
Flux<ChatRoomInfo> getChatRoomInfo()
{
- return Flux
- .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
- .filter(shard -> isShardOwned[shard])
- .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
+ return Flux.fromIterable(chatRoomInfo.values());
}
- Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
+ Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
- if (loadInProgress)
- {
- return Mono.error(new LoadInProgressException());
- }
-
- if (!isShardOwned[shard])
- {
- return Mono.error(new ShardNotOwnedException(shard));
- }
-
- return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
+ return Mono.fromSupplier(() -> chatRoomInfo.get(id));
}
}
public class KafkaChatHomeService implements ChatHomeService
{
private final int numPartitions;
- private final ChatRoomChannel chatRoomChannel;
+ private final InfoChannel infoChannel;
+ private final DataChannel dataChannel;
@Override
public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
- log.info("Sending create-command for chat rooom: id={}, name={}");
- return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+ int shard = selectShard(id);
+ log.info(
+ "Sending create-command for chat rooom: id={}, name={}, shard={}",
+ id,
+ name,
+ shard);
+ return infoChannel.sendChatRoomCreatedEvent(id, name, shard);
}
@Override
public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
- int shard = selectShard(id);
- return chatRoomChannel
- .getChatRoomInfo(shard, id)
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
- id,
- shard,
- chatRoomChannel.getOwnedShards())));
+ return infoChannel
+ .getChatRoomInfo(id)
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
}
@Override
public Flux<ChatRoomInfo> getChatRoomInfo()
{
- return chatRoomChannel.getChatRoomInfo();
+ return infoChannel.getChatRoomInfo();
}
@Override
public Mono<ChatRoomData> getChatRoomData(UUID id)
{
int shard = selectShard(id);
- return chatRoomChannel
+ return dataChannel
.getChatRoomData(shard, id)
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
id,
shard,
- chatRoomChannel.getOwnedShards())));
+ dataChannel.getOwnedShards())));
}
int selectShard(UUID chatRoomId)
@Slf4j
public class KafkaChatMessageService implements ChatMessageService
{
- private final ChatRoomChannel chatRoomChannel;
+ private final DataChannel dataChannel;
private final UUID chatRoomId;
private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
LocalDateTime timestamp,
String text)
{
- return chatRoomChannel
+ return dataChannel
.sendChatMessage(chatRoomId, key, timestamp, text)
.doOnSuccess(message -> persistMessage(message));
}
package de.juplo.kafka.chat.backend.implementation.kafka;
+import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
@Slf4j
public class KafkaServicesApplicationRunner implements ApplicationRunner
{
- private final ConsumerTaskExecutor chatRoomChannelTaskExecutor;
+ private final ConsumerTaskRunner consumerTaskRunner;
@Override
public void run(ApplicationArguments args) throws Exception
{
- chatRoomChannelTaskExecutor.executeConsumerTask();
+ consumerTaskRunner.run();
+ }
+
+ @PreDestroy
+ public void joinConsumerTasks()
+ {
+ consumerTaskRunner.joinConsumerTasks();
}
}
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
public class KafkaServicesConfiguration
{
@Bean
- ConsumerTaskExecutor chatRoomChannelTaskExecutor(
+ ConsumerTaskRunner consumerTaskRunner(
+ ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
+ ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
+ {
+ return new ConsumerTaskRunner(
+ infoChannelConsumerTaskExecutor,
+ dataChannelConsumerTaskExecutor);
+ }
+
+ @Bean
+ ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
ThreadPoolTaskExecutor taskExecutor,
- ChatRoomChannel chatRoomChannel,
- Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
- ConsumerTaskExecutor.WorkAssignor workAssignor)
+ InfoChannel infoChannel,
+ Consumer<String, AbstractMessageTo> infoChannelConsumer,
+ ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor)
{
return new ConsumerTaskExecutor(
taskExecutor,
- chatRoomChannel,
- chatRoomChannelConsumer,
- workAssignor);
+ infoChannel,
+ infoChannelConsumer,
+ infoChannelWorkAssignor);
+ }
+
+ @Bean
+ ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor(
+ ChatBackendProperties properties)
+ {
+ return consumer ->
+ {
+ String topic = properties.getKafka().getInfoChannelTopic();
+ List<TopicPartition> partitions = consumer
+ .partitionsFor(topic)
+ .stream()
+ .map(partitionInfo ->
+ new TopicPartition(topic, partitionInfo.partition()))
+ .toList();
+ consumer.assign(partitions);
+ };
}
@Bean
- ConsumerTaskExecutor.WorkAssignor workAssignor(
+ ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
+ ThreadPoolTaskExecutor taskExecutor,
+ DataChannel dataChannel,
+ Consumer<String, AbstractMessageTo> dataChannelConsumer,
+ ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor)
+ {
+ return new ConsumerTaskExecutor(
+ taskExecutor,
+ dataChannel,
+ dataChannelConsumer,
+ dataChannelWorkAssignor);
+ }
+
+ @Bean
+ ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor(
ChatBackendProperties properties,
- ChatRoomChannel chatRoomChannel)
+ DataChannel dataChannel)
{
return consumer ->
{
List<String> topics =
- List.of(properties.getKafka().getChatRoomChannelTopic());
- consumer.subscribe(topics, chatRoomChannel);
+ List.of(properties.getKafka().getDataChannelTopic());
+ consumer.subscribe(topics, dataChannel);
};
}
@Bean
ChatHomeService kafkaChatHome(
ChatBackendProperties properties,
- ChatRoomChannel chatRoomChannel)
+ InfoChannel infoChannel,
+ DataChannel dataChannel)
{
return new KafkaChatHomeService(
properties.getKafka().getNumPartitions(),
- chatRoomChannel);
+ infoChannel,
+ dataChannel);
}
@Bean
- ChatRoomChannel chatRoomChannel(
+ InfoChannel infoChannel(
ChatBackendProperties properties,
- Producer<String, AbstractMessageTo> chatRoomChannelProducer,
- Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+ Producer<String, AbstractMessageTo> producer,
+ Consumer<String, AbstractMessageTo> infoChannelConsumer)
+ {
+ return new InfoChannel(
+ properties.getKafka().getInfoChannelTopic(),
+ producer,
+ infoChannelConsumer);
+ }
+
+ @Bean
+ DataChannel dataChannel(
+ ChatBackendProperties properties,
+ Producer<String, AbstractMessageTo> producer,
+ Consumer<String, AbstractMessageTo> dataChannelConsumer,
ZoneId zoneId,
Clock clock)
{
- return new ChatRoomChannel(
- properties.getKafka().getChatRoomChannelTopic(),
- chatRoomChannelProducer,
- chatRoomChannelConsumer,
+ return new DataChannel(
+ properties.getKafka().getDataChannelTopic(),
+ producer,
+ dataChannelConsumer,
zoneId,
properties.getKafka().getNumPartitions(),
properties.getChatroomBufferSize(),
}
@Bean
- Producer<String, AbstractMessageTo> chatRoomChannelProducer(
+ Producer<String, AbstractMessageTo> producer(
Properties defaultProducerProperties,
ChatBackendProperties chatBackendProperties,
StringSerializer stringSerializer,
defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
properties.put(
ProducerConfig.CLIENT_ID_CONFIG,
- chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
return new KafkaProducer<>(
properties,
stringSerializer,
}
@Bean
- Consumer<String, AbstractMessageTo> chatRoomChannelConsumer(
+ Consumer<String, AbstractMessageTo> infoChannelConsumer(
+ Properties defaultConsumerProperties,
+ ChatBackendProperties chatBackendProperties,
+ StringDeserializer stringDeserializer,
+ JsonDeserializer<AbstractMessageTo> messageDeserializer)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
+ properties.put(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "info_channel");
+ return new KafkaConsumer<>(
+ properties,
+ stringDeserializer,
+ messageDeserializer);
+ }
+
+ @Bean
+ Consumer<String, AbstractMessageTo> dataChannelConsumer(
Properties defaultConsumerProperties,
ChatBackendProperties chatBackendProperties,
StringDeserializer stringDeserializer,
defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
properties.put(
ConsumerConfig.CLIENT_ID_CONFIG,
- chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
properties.put(
ConsumerConfig.GROUP_ID_CONFIG,
- "chatroom_channel");
+ "data_channel");
return new KafkaConsumer<>(
properties,
stringDeserializer,
String typeMappings ()
{
return
- "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," +
+ "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
"event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
}
public enum ToType {
COMMAND_CREATE_CHATROOM,
EVENT_CHATMESSAGE_RECEIVED,
+ EVENT_CHATROOM_CREATED,
}
@Getter
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import lombok.*;
-package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;public class EventChatRoomCreated {
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import java.util.UUID;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class EventChatRoomCreated extends AbstractMessageTo
+{
+ private String id;
+ private String name;
+ private Integer shard;
+
+
+ public EventChatRoomCreated()
+ {
+ super(ToType.EVENT_CHATROOM_CREATED);
+ }
+
+
+ public ChatRoomInfo toChatRoomInfo()
+ {
+ return new ChatRoomInfo(UUID.fromString(id), name, shard);
+ }
+
+ public static EventChatRoomCreated of(UUID id, String name, Integer shard)
+ {
+ EventChatRoomCreated event = new EventChatRoomCreated();
+
+ event.setId(id.toString());
+ event.setName(name);
+ event.setShard(shard);
+
+ return event;
+ }
}
@Getter(AccessLevel.PACKAGE)
@Setter(AccessLevel.PACKAGE)
@EqualsAndHashCode(of = { "id" })
-@ToString(of = { "id", "shard", "name" })
+@ToString(of = { "id", "name" })
@Document
public class ChatRoomTo
{
@Id
private String id;
- private Integer shard;
private String name;
public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
{
return new ChatRoomTo(
chatRoomInfo.getId().toString(),
- chatRoomInfo.getShard(),
chatRoomInfo.getName());
}
}
.map(chatRoomTo ->
{
UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
- int shard = shardingStrategy.selectShard(chatRoomId);
-
- log.info(
- "{} - old shard: {}, new shard: {}",
- chatRoomId,
- chatRoomTo.getShard(),
- shard);
-
- return new ChatRoomInfo(
- chatRoomId,
- chatRoomTo.getName(),
- shard);
+ return new ChatRoomInfo(chatRoomId, chatRoomTo.getName());
});
}
package de.juplo.kafka.chat.backend;
-import de.juplo.kafka.chat.backend.implementation.kafka.ChatRoomChannel;
-import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesApplicationRunner;
+import de.juplo.kafka.chat.backend.implementation.kafka.*;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {
+ "spring.main.allow-bean-definition-overriding=true",
"chat.backend.services=kafka",
"chat.backend.kafka.client-id-PREFIX=TEST",
"chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
+ "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
+ "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
"chat.backend.kafka.num-partitions=10",
})
-@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@EmbeddedKafka(
+ topics = { INFO_TOPIC, DATA_TOPIC },
+ partitions = 10)
@Slf4j
class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
{
- final static String TOPIC = "KAFKA_CONFIGURATION_IT";
-
- static CompletableFuture<Void> CONSUMER_JOB;
+ final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
+ final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
@MockBean
KafkaServicesApplicationRunner kafkaServicesApplicationRunner;
@BeforeAll
public static void sendAndLoadStoredData(
@Autowired KafkaTemplate<String, String> messageTemplate,
- @Autowired Consumer chatRoomChannelConsumer,
- @Autowired ThreadPoolTaskExecutor taskExecutor,
- @Autowired ChatRoomChannel chatRoomChannel)
+ @Autowired ConsumerTaskRunner consumerTaskRunner)
{
- send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+ send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
+ send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+ send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+ send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+ send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
- List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
- chatRoomChannelConsumer.assign(assignedPartitions);
- chatRoomChannel.onPartitionsAssigned(assignedPartitions);
- CONSUMER_JOB = taskExecutor
- .submitCompletable(chatRoomChannel)
- .exceptionally(e ->
- {
- log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
- return null;
- });
+ consumerTaskRunner.run();
}
- static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+ static void send(
+ KafkaTemplate<String, String> kafkaTemplate,
+ String topic,
+ String key,
+ String value,
+ String typeId)
{
- ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
record.headers().add("__TypeId__", typeId.getBytes());
SendResult<String, String> result = kafkaTemplate.send(record).join();
log.info(
value,
new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
}
-
@AfterAll
- static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer)
+ static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
+ {
+ consumerTaskRunner.joinConsumerTasks();
+ }
+
+
+ @TestConfiguration
+ @EnableConfigurationProperties(ChatBackendProperties.class)
+ @Import(KafkaServicesConfiguration.class)
+ static class KafkaConfigurationITConfiguration
{
- log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
- chatRoomChannelConsumer.wakeup();
- log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
- CONSUMER_JOB.join();
- log.info("Joined the consumer of the ChatRoomChannel");
+ @Bean
+ ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor(
+ DataChannel dataChannel)
+ {
+ return consumer ->
+ {
+ List<TopicPartition> assignedPartitions =
+ List.of(new TopicPartition(DATA_TOPIC, 2));
+ consumer.assign(assignedPartitions);
+ dataChannel.onPartitionsAssigned(assignedPartitions);
+ };
+ }
}
}
import java.util.List;
import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
-import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.TOPIC;
+import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC;
+import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.INFO_TOPIC;
@SpringBootTest(
"chat.backend.kafka.client-id-PREFIX=TEST",
"chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
+ "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
+ "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
"chat.backend.kafka.num-partitions=" + NUM_SHARDS,
})
-@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@EmbeddedKafka(
+ topics = { INFO_TOPIC, DATA_TOPIC },
+ partitions = 10)
@Slf4j
public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
{
- final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
+ final static String INFO_TOPIC = "KAFKA_CHAT_HOME_TEST_INFO";
+ final static String DATA_TOPIC = "KAFKA_CHAT_HOME_TEST_DATA";
@TestConfiguration
static class KafkaChatHomeTestConfiguration
{
@Bean
- ConsumerTaskExecutor.WorkAssignor workAssignor(
- ChatRoomChannel chatRoomChannel)
+ ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor(
+ DataChannel dataChannel)
{
return consumer ->
{
List<TopicPartition> assignedPartitions =
- List.of(new TopicPartition(TOPIC, 2));
+ List.of(new TopicPartition(DATA_TOPIC, 2));
consumer.assign(assignedPartitions);
- chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+ dataChannel.onPartitionsAssigned(assignedPartitions);
};
}
@BeforeAll
public static void sendAndLoadStoredData(
- @Autowired ConsumerTaskExecutor consumerTaskExecutor,
- @Autowired KafkaTemplate<String, String> messageTemplate)
+ @Autowired ConsumerTaskRunner consumerTaskRunner,
+ @Autowired KafkaTemplate<String, String> messageTemplate) throws InterruptedException
{
- send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+ send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
+ send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+ send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+ send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+ send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
- consumerTaskExecutor.executeConsumerTask();
+ consumerTaskRunner.run();
}
- static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+ static void send(
+ KafkaTemplate<String, String> kafkaTemplate,
+ String topic,
+ String key,
+ String value,
+ String typeId)
{
- ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
record.headers().add("__TypeId__", typeId.getBytes());
SendResult<String, String> result = kafkaTemplate.send(record).join();
log.info(
}
@AfterAll
- static void joinConsumerJob(@Autowired ConsumerTaskExecutor consumerTaskExecutor)
+ static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
{
- consumerTaskExecutor.joinConsumerTaskJob();
+ consumerTaskRunner.joinConsumerTasks();
}
}
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;