projects
/
demos
/
kafka
/
chat
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (from parent 1:
f747532
)
WIP
author
Kai Moritz
<kai@juplo.de>
Sun, 26 Feb 2023 18:13:25 +0000
(19:13 +0100)
committer
Kai Moritz
<kai@juplo.de>
Sun, 26 Feb 2023 18:13:25 +0000
(19:13 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
index
15d968a
..
465775f
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
@@
-2,9
+2,12
@@
package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.Instant;
import java.time.LocalDateTime;
@@
-18,8
+21,10
@@
import java.util.UUID;
@Slf4j
class ChatHomeLoader
{
@Slf4j
class ChatHomeLoader
{
+ private final Producer<String, MessageTo> producer;
private final long offsetOfFirstUnseenMessage;
private final ZoneId zoneId;
private final long offsetOfFirstUnseenMessage;
private final ZoneId zoneId;
+ @Getter
private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
@@
-33,6
+38,8
@@
class ChatHomeLoader
*/
boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
{
*/
boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
{
+ TopicPartition topicPartition =
+ new TopicPartition(record.topic(), record.partition());
Message.MessageKey messageKey = Message.MessageKey.of(
record.value().getUser(),
record.value().getId());
Message.MessageKey messageKey = Message.MessageKey.of(
record.value().getUser(),
record.value().getId());
@@
-41,10
+48,9
@@
class ChatHomeLoader
{
// All messages consumed: DONE!
log.trace(
{
// All messages consumed: DONE!
log.trace(
- "Ignoring unseen message {}
: topic={}, partition=
{}, offset={}",
+ "Ignoring unseen message {}
on
{}, offset={}",
messageKey,
messageKey,
- record.topic(),
- record.partition(),
+ topicPartition,
record.offset());
return true;
}
record.offset());
return true;
}
@@
-54,15
+60,13
@@
class ChatHomeLoader
KafkaChatRoomService service = kafkaChatRoomServiceMap
.computeIfAbsent(record.key(), key ->
KafkaChatRoomService service = kafkaChatRoomServiceMap
.computeIfAbsent(record.key(), key ->
- {
- });
+ new KafkaChatRoomService(producer, topicPartition));
service.addMessage(new Message(
messageKey,
record.offset(),
time,
service.addMessage(new Message(
messageKey,
record.offset(),
time,
- record.value().getText()
- ));
+ record.value().getText()));
return false;
}
return false;
}
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java
b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java
index
c4737a1
..
37c4e50
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java
@@
-10,9
+10,7
@@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
-import java.time.ZoneOffset;
import java.util.LinkedHashMap;
import java.util.LinkedHashMap;
-import java.util.UUID;
@Slf4j
@Slf4j
@@
-20,8
+18,6
@@
public class KafkaChatRoomService implements ChatRoomService
{
private final Producer<String, MessageTo> producer;
private final TopicPartition tp;
{
private final Producer<String, MessageTo> producer;
private final TopicPartition tp;
- private final UUID chatRoomId;
- private final ZoneOffset zoneOffset;
private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
@@
-30,14
+26,10
@@
public class KafkaChatRoomService implements ChatRoomService
public KafkaChatRoomService(
Producer<String, MessageTo> producer,
public KafkaChatRoomService(
Producer<String, MessageTo> producer,
- TopicPartition tp,
- UUID chatRoomId,
- ZoneOffset zoneOffset)
+ TopicPartition tp)
{
this.producer = producer;
this.tp = tp;
{
this.producer = producer;
this.tp = tp;
- this.chatRoomId = chatRoomId;
- this.zoneOffset = zoneOffset;
this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
}
this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
}