From: Kai Moritz Date: Sun, 20 Aug 2023 08:08:04 +0000 (+0200) Subject: WIP X-Git-Tag: rebase--2023-08-20~13 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=08e679fa4ec48797df9750637831bacec7826ea3;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java deleted file mode 100644 index 08975a9d..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ /dev/null @@ -1,373 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.Message; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.*; -import java.util.*; -import java.util.stream.IntStream; - - -@Slf4j -public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener -{ - private final String topic; - private final Producer producer; - private final Consumer consumer; - private final ZoneId zoneId; - private final int numShards; - private final int bufferSize; - private final Clock clock; - private final boolean[] isShardOwned; - private final long[] currentOffset; - private final long[] nextOffset; - private final Map[] chatrooms; - - private boolean running; - @Getter - private volatile boolean loadInProgress; - - - public ChatMessageChannel( - String topic, - Producer producer, - Consumer consumer, - ZoneId zoneId, - int numShards, - int bufferSize, - Clock clock) - { - log.debug( - "Creating ChatMessageChannel for topic {} with {} partitions", - topic, - numShards); - this.topic = topic; - this.consumer = consumer; - this.producer = producer; - this.zoneId = zoneId; - this.numShards = numShards; - this.bufferSize = bufferSize; - this.clock = clock; - this.isShardOwned = new boolean[numShards]; - this.currentOffset = new long[numShards]; - this.nextOffset = new long[numShards]; - this.chatrooms = new Map[numShards]; - IntStream - .range(0, numShards) - .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); - } - - - - Mono sendCreateChatRoomRequest( - UUID chatRoomId, - String name) - { - CreateChatRoomCommandTo createChatRoomRequestTo = CreateChatRoomCommandTo.of(name); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - chatRoomId.toString(), - createChatRoomRequestTo); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); - ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); - createChatRoom(chatRoomInfo); - sink.success(chatRoomInfo); - } - else - { - // On send-failure - log.error( - "Could not send create-request for chat room (id={}, name={}): {}", - chatRoomId, - name, - exception); - sink.error(exception); - } - })); - }); - } - - Mono sendChatMessage( - UUID chatRoomId, - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - null, - zdt.toEpochSecond(), - chatRoomId.toString(), - ChatMessageReceivedEventTo.of(key.getUsername(), key.getMessageId(), text)); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - // On successful send - Message message = new Message(key, metadata.offset(), timestamp, text); - log.info("Successfully send message {}", message); - sink.success(message); - } - else - { - // On send-failure - log.error( - "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", - chatRoomId, - key, - timestamp, - text, - exception); - sink.error(exception); - } - })); - }); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - log.info("Newly assigned partitions! Pausing normal operations..."); - loadInProgress = true; - - consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = true; - this.currentOffset[partition] = currentOffset; - - log.info( - "Partition assigned: {} - loading messages: next={} -> current={}", - partition, - nextOffset[partition], - currentOffset); - - consumer.seek(topicPartition, nextOffset[partition]); - }); - - consumer.resume(partitions); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(topicPartition -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = false; - log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); - }); - } - - @Override - public void onPartitionsLost(Collection partitions) - { - log.warn("Lost partitions: {}, partitions"); - // TODO: Muss auf den Verlust anders reagiert werden? - onPartitionsRevoked(partitions); - } - - @Override - public void run() - { - consumer.subscribe(List.of(topic), this); - - running = true; - - while (running) - { - try - { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); - log.info("Fetched {} messages", records.count()); - - if (loadInProgress) - { - loadChatRoom(records); - - if (isLoadingCompleted()) - { - log.info("Loading of messages completed! Pausing all owned partitions..."); - pauseAllOwnedPartions(); - log.info("Resuming normal operations..."); - loadInProgress = false; - } - } - else - { - if (!records.isEmpty()) - { - throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); - } - } - } - catch (WakeupException e) - { - log.info("Received WakeupException, exiting!"); - running = false; - } - } - - log.info("Exiting normally"); - } - - void loadChatRoom(ConsumerRecords records) - { - for (ConsumerRecord record : records) - { - UUID chatRoomId = UUID.fromString(record.key()); - - switch (record.value().getType()) - { - case CREATE_CHATROOM_COMMAND: - createChatRoom( - chatRoomId, - (CreateChatRoomCommandTo) record.value(), - record.partition()); - break; - - case CHATMESSAGE_EVENT: - Instant instant = Instant.ofEpochSecond(record.timestamp()); - LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); - loadChatMessage( - chatRoomId, - timestamp, - record.offset(), - (ChatMessageReceivedEventTo) record.value(), - record.partition()); - break; - - default: - log.debug( - "Ignoring message for chat-room {} with offset {}: {}", - chatRoomId, - record.offset(), - record.value()); - } - - nextOffset[record.partition()] = record.offset() + 1; - } - } - - void createChatRoom( - UUID chatRoomId, - CreateChatRoomCommandTo createChatRoomRequestTo, - int partition) - { - log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); - KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId); - ChatRoom chatRoom = new ChatRoom( - chatRoomId, - createChatRoomRequestTo.getName(), - partition, - clock, - service, - bufferSize); - putChatRoom(chatRoom); - } - - - void createChatRoom(ChatRoomInfo chatRoomInfo) - { - UUID id = chatRoomInfo.getId(); - String name = chatRoomInfo.getName(); - int shard = chatRoomInfo.getShard(); - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatRoomService service = new KafkaChatRoomService(this, id); - ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); - putChatRoom(chatRoom); - } - - void loadChatMessage( - UUID chatRoomId, - LocalDateTime timestamp, - long offset, - ChatMessageReceivedEventTo chatMessageTo, - int partition) - { - Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); - Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - - ChatRoom chatRoom = chatrooms[partition].get(chatRoomId); - KafkaChatRoomService kafkaChatRoomService = - (KafkaChatRoomService) chatRoom.getChatRoomService(); - - kafkaChatRoomService.persistMessage(message); - } - - boolean isLoadingCompleted() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); - } - - void pauseAllOwnedPartions() - { - consumer.pause(IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> new TopicPartition(topic, shard)) - .toList()); - } - - - private void putChatRoom(ChatRoom chatRoom) - { - Integer partition = chatRoom.getShard(); - UUID chatRoomId = chatRoom.getId(); - if (chatrooms[partition].containsKey(chatRoomId)) - { - log.warn("Ignoring existing chat-room: " + chatRoom); - } - else - { - log.info( - "Adding new chat-room to partition {}: {}", - partition, - chatRoom); - - chatrooms[partition].put(chatRoomId, chatRoom); - } - } - - Mono getChatRoom(int shard, UUID id) - { - return Mono.justOrEmpty(chatrooms[shard].get(id)); - } - - Flux getChatRooms() - { - return Flux - .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) - .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values())); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java new file mode 100644 index 00000000..1308946e --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -0,0 +1,373 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.*; +import java.util.*; +import java.util.stream.IntStream; + + +@Slf4j +public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener +{ + private final String topic; + private final Producer producer; + private final Consumer consumer; + private final ZoneId zoneId; + private final int numShards; + private final int bufferSize; + private final Clock clock; + private final boolean[] isShardOwned; + private final long[] currentOffset; + private final long[] nextOffset; + private final Map[] chatrooms; + + private boolean running; + @Getter + private volatile boolean loadInProgress; + + + public ChatRoomChannel( + String topic, + Producer producer, + Consumer consumer, + ZoneId zoneId, + int numShards, + int bufferSize, + Clock clock) + { + log.debug( + "Creating ChatMessageChannel for topic {} with {} partitions", + topic, + numShards); + this.topic = topic; + this.consumer = consumer; + this.producer = producer; + this.zoneId = zoneId; + this.numShards = numShards; + this.bufferSize = bufferSize; + this.clock = clock; + this.isShardOwned = new boolean[numShards]; + this.currentOffset = new long[numShards]; + this.nextOffset = new long[numShards]; + this.chatrooms = new Map[numShards]; + IntStream + .range(0, numShards) + .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); + } + + + + Mono sendCreateChatRoomRequest( + UUID chatRoomId, + String name) + { + CreateChatRoomCommandTo createChatRoomRequestTo = CreateChatRoomCommandTo.of(name); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + chatRoomId.toString(), + createChatRoomRequestTo); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); + createChatRoom(chatRoomInfo); + sink.success(chatRoomInfo); + } + else + { + // On send-failure + log.error( + "Could not send create-request for chat room (id={}, name={}): {}", + chatRoomId, + name, + exception); + sink.error(exception); + } + })); + }); + } + + Mono sendChatMessage( + UUID chatRoomId, + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + null, + zdt.toEpochSecond(), + chatRoomId.toString(), + ChatMessageReceivedEventTo.of(key.getUsername(), key.getMessageId(), text)); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + // On successful send + Message message = new Message(key, metadata.offset(), timestamp, text); + log.info("Successfully send message {}", message); + sink.success(message); + } + else + { + // On send-failure + log.error( + "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", + chatRoomId, + key, + timestamp, + text, + exception); + sink.error(exception); + } + })); + }); + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("Newly assigned partitions! Pausing normal operations..."); + loadInProgress = true; + + consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> + { + int partition = topicPartition.partition(); + isShardOwned[partition] = true; + this.currentOffset[partition] = currentOffset; + + log.info( + "Partition assigned: {} - loading messages: next={} -> current={}", + partition, + nextOffset[partition], + currentOffset); + + consumer.seek(topicPartition, nextOffset[partition]); + }); + + consumer.resume(partitions); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(topicPartition -> + { + int partition = topicPartition.partition(); + isShardOwned[partition] = false; + log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + }); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + log.warn("Lost partitions: {}, partitions"); + // TODO: Muss auf den Verlust anders reagiert werden? + onPartitionsRevoked(partitions); + } + + @Override + public void run() + { + consumer.subscribe(List.of(topic), this); + + running = true; + + while (running) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + log.info("Fetched {} messages", records.count()); + + if (loadInProgress) + { + loadChatRoom(records); + + if (isLoadingCompleted()) + { + log.info("Loading of messages completed! Pausing all owned partitions..."); + pauseAllOwnedPartions(); + log.info("Resuming normal operations..."); + loadInProgress = false; + } + } + else + { + if (!records.isEmpty()) + { + throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); + } + } + } + catch (WakeupException e) + { + log.info("Received WakeupException, exiting!"); + running = false; + } + } + + log.info("Exiting normally"); + } + + void loadChatRoom(ConsumerRecords records) + { + for (ConsumerRecord record : records) + { + UUID chatRoomId = UUID.fromString(record.key()); + + switch (record.value().getType()) + { + case CREATE_CHATROOM_COMMAND: + createChatRoom( + chatRoomId, + (CreateChatRoomCommandTo) record.value(), + record.partition()); + break; + + case CHATMESSAGE_EVENT: + Instant instant = Instant.ofEpochSecond(record.timestamp()); + LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + loadChatMessage( + chatRoomId, + timestamp, + record.offset(), + (ChatMessageReceivedEventTo) record.value(), + record.partition()); + break; + + default: + log.debug( + "Ignoring message for chat-room {} with offset {}: {}", + chatRoomId, + record.offset(), + record.value()); + } + + nextOffset[record.partition()] = record.offset() + 1; + } + } + + void createChatRoom( + UUID chatRoomId, + CreateChatRoomCommandTo createChatRoomRequestTo, + int partition) + { + log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId); + ChatRoom chatRoom = new ChatRoom( + chatRoomId, + createChatRoomRequestTo.getName(), + partition, + clock, + service, + bufferSize); + putChatRoom(chatRoom); + } + + + void createChatRoom(ChatRoomInfo chatRoomInfo) + { + UUID id = chatRoomInfo.getId(); + String name = chatRoomInfo.getName(); + int shard = chatRoomInfo.getShard(); + log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, id); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + putChatRoom(chatRoom); + } + + void loadChatMessage( + UUID chatRoomId, + LocalDateTime timestamp, + long offset, + ChatMessageReceivedEventTo chatMessageTo, + int partition) + { + Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); + Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); + + ChatRoom chatRoom = chatrooms[partition].get(chatRoomId); + KafkaChatRoomService kafkaChatRoomService = + (KafkaChatRoomService) chatRoom.getChatRoomService(); + + kafkaChatRoomService.persistMessage(message); + } + + boolean isLoadingCompleted() + { + return IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); + } + + void pauseAllOwnedPartions() + { + consumer.pause(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> new TopicPartition(topic, shard)) + .toList()); + } + + + private void putChatRoom(ChatRoom chatRoom) + { + Integer partition = chatRoom.getShard(); + UUID chatRoomId = chatRoom.getId(); + if (chatrooms[partition].containsKey(chatRoomId)) + { + log.warn("Ignoring existing chat-room: " + chatRoom); + } + else + { + log.info( + "Adding new chat-room to partition {}: {}", + partition, + chatRoom); + + chatrooms[partition].put(chatRoomId, chatRoom); + } + } + + Mono getChatRoom(int shard, UUID id) + { + return Mono.justOrEmpty(chatrooms[shard].get(id)); + } + + Flux getChatRooms() + { + return Flux + .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) + .filter(shard -> isShardOwned[shard]) + .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values())); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 77790bd6..26887a95 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -2,8 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -18,7 +16,7 @@ import java.util.*; public class KafkaChatHome implements ChatHome { private final KafkaLikeShardingStrategy shardingStrategy; - private final ChatMessageChannel chatMessageChanel; + private final ChatRoomChannel chatMessageChanel; @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java index c46529d8..825f16eb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -13,12 +13,12 @@ import java.util.UUID; @Slf4j public class KafkaChatRoomFactory implements ChatRoomFactory { - private final ChatMessageChannel chatMessageChannel; + private final ChatRoomChannel chatRoomChannel; @Override public Mono createChatRoom(UUID id, String name) { log.info("Sending create-request for chat rooom: id={}, name={}"); - return chatMessageChannel.sendCreateChatRoomRequest(id, name); + return chatRoomChannel.sendCreateChatRoomRequest(id, name); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 77ecf1ca..7b2cc0b1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -15,7 +15,7 @@ import java.util.UUID; @Slf4j public class KafkaChatRoomService implements ChatRoomService { - private final ChatMessageChannel chatMessageChannel; + private final ChatRoomChannel chatRoomChannel; private final UUID chatRoomId; private final LinkedHashMap messages = new LinkedHashMap<>(); @@ -27,7 +27,7 @@ public class KafkaChatRoomService implements ChatRoomService LocalDateTime timestamp, String text) { - return chatMessageChannel + return chatRoomChannel .sendChatMessage(chatRoomId, key, timestamp, text) .doOnSuccess(message -> persistMessage(message)); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java index f0dc3155..fac35825 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -28,7 +28,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner ConfigurableApplicationContext context; @Autowired - ChatMessageChannel chatMessageChannel; + ChatRoomChannel chatRoomChannel; @Autowired Consumer chatMessageChannelConsumer; @@ -40,7 +40,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner { log.info("Starting the consumer for the ChatMessageChannel"); chatMessageChannelConsumerJob = taskExecutor - .submitCompletable(chatMessageChannel) + .submitCompletable(chatRoomChannel) .exceptionally(e -> { log.error("The consumer for the ChatMessageChannel exited abnormally!", e); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 84919c71..724739bf 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -34,15 +34,15 @@ public class KafkaServicesConfiguration @Bean ChatHome kafkaChatHome( KafkaLikeShardingStrategy shardingStrategy, - ChatMessageChannel chatMessageChannel) + ChatRoomChannel chatRoomChannel) { - return new KafkaChatHome(shardingStrategy, chatMessageChannel); + return new KafkaChatHome(shardingStrategy, chatRoomChannel); } @Bean - KafkaChatRoomFactory chatRoomFactory(ChatMessageChannel chatMessageChannel) + KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel) { - return new KafkaChatRoomFactory(chatMessageChannel); + return new KafkaChatRoomFactory(chatRoomChannel); } @Bean @@ -52,14 +52,14 @@ public class KafkaServicesConfiguration } @Bean - ChatMessageChannel chatMessageChannel( + ChatRoomChannel chatMessageChannel( ChatBackendProperties properties, Producer chatMessageChannelProducer, Consumer chatMessageChannelConsumer, ZoneId zoneId, Clock clock) { - return new ChatMessageChannel( + return new ChatRoomChannel( properties.getKafka().getMessageChannelTopic(), chatMessageChannelProducer, chatMessageChannelConsumer,