projects
/
demos
/
kafka
/
chat
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
WIP
[demos/kafka/chat]
/
src
/
main
/
java
/
de
/
juplo
/
kafka
/
chat
/
backend
/
persistence
/
kafka
/
ChatHomeLoader.java
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
index
365bb5e
..
15d968a
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
@@
-18,7
+18,7
@@
import java.util.UUID;
@Slf4j
class ChatHomeLoader
{
@Slf4j
class ChatHomeLoader
{
- private final long offsetOfFirst
New
Message;
+ private final long offsetOfFirst
Unseen
Message;
private final ZoneId zoneId;
private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
private final ZoneId zoneId;
private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
@@
-33,10
+33,19
@@
class ChatHomeLoader
*/
boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
{
*/
boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
{
- if (record.offset() >= offsetOfFirstNewMessage)
+ Message.MessageKey messageKey = Message.MessageKey.of(
+ record.value().getUser(),
+ record.value().getId());
+
+ if (record.offset() >= offsetOfFirstUnseenMessage)
{
// All messages consumed: DONE!
{
// All messages consumed: DONE!
- log.debug("I");
+ log.trace(
+ "Ignoring unseen message {}: topic={}, partition={}, offset={}",
+ messageKey,
+ record.topic(),
+ record.partition(),
+ record.offset());
return true;
}
return true;
}
@@
-49,9
+58,7
@@
class ChatHomeLoader
});
service.addMessage(new Message(
});
service.addMessage(new Message(
- Message.MessageKey.of(
- record.value().getUser(),
- record.value().getId()),
+ messageKey,
record.offset(),
time,
record.value().getText()
record.offset(),
time,
record.value().getText()