projects
/
demos
/
kafka
/
chat
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (from parent 1:
ffb82f7
)
WIP
author
Kai Moritz
<kai@juplo.de>
Sun, 26 Feb 2023 18:04:38 +0000
(19:04 +0100)
committer
Kai Moritz
<kai@juplo.de>
Sun, 26 Feb 2023 18:04:38 +0000
(19:04 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
index
365bb5e
..
15d968a
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
@@
-18,7
+18,7
@@
import java.util.UUID;
@Slf4j
class ChatHomeLoader
{
@Slf4j
class ChatHomeLoader
{
- private final long offsetOfFirst
New
Message;
+ private final long offsetOfFirst
Unseen
Message;
private final ZoneId zoneId;
private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
private final ZoneId zoneId;
private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
@@
-33,10
+33,19
@@
class ChatHomeLoader
*/
boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
{
*/
boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
{
- if (record.offset() >= offsetOfFirstNewMessage)
+ Message.MessageKey messageKey = Message.MessageKey.of(
+ record.value().getUser(),
+ record.value().getId());
+
+ if (record.offset() >= offsetOfFirstUnseenMessage)
{
// All messages consumed: DONE!
{
// All messages consumed: DONE!
- log.debug("I");
+ log.trace(
+ "Ignoring unseen message {}: topic={}, partition={}, offset={}",
+ messageKey,
+ record.topic(),
+ record.partition(),
+ record.offset());
return true;
}
return true;
}
@@
-49,9
+58,7
@@
class ChatHomeLoader
});
service.addMessage(new Message(
});
service.addMessage(new Message(
- Message.MessageKey.of(
- record.value().getUser(),
- record.value().getId()),
+ messageKey,
record.offset(),
time,
record.value().getText()
record.offset(),
time,
record.value().getText()
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
index
a5d63fd
..
e171bc5
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
@@
-18,7
+18,8
@@
public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
private final Consumer<String, MessageTo> consumer;
private final String topic;
// private final long[] offsets; Erst mal immer alles neu einlesen
private final Consumer<String, MessageTo> consumer;
private final String topic;
// private final long[] offsets; Erst mal immer alles neu einlesen
- private final Map<UUID, ChatRoom>[] chatrooms;
+ private final Map<UUID, KafkaChatRoomService>[] kafkaChatRoomServiceMaps;
+ private final Map<UUID, ChatRoom>[] chatRoomMaps;
public KafkaChatHomeService(
public KafkaChatHomeService(
@@
-34,7
+35,8
@@
public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
// {
// this.offsets[i] = 0l;
// }
// {
// this.offsets[i] = 0l;
// }
- this.chatrooms = new Map[numShards];
+ this.kafkaChatRoomServiceMaps = new Map[numShards];
+ this.chatRoomMaps = new Map[numShards];
}
}
@@
-50,6
+52,7
@@
public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
}
int partition = tp.partition();
}
int partition = tp.partition();
+ kafkaChatRoomServiceMaps[partition] = new HashMap<>(); // TODO: reuse! Nicht immer alles neu laden
long unseenOffset = 0; // offsets[partition];
log.info(
long unseenOffset = 0; // offsets[partition];
log.info(
@@
-59,7
+62,7
@@
public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
currentOffset);
consumer.seek(tp, unseenOffset);
currentOffset);
consumer.seek(tp, unseenOffset);
- chat
room
s[partition]
+ chat
RoomMap
s[partition]
.values()
.stream()
handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
.values()
.stream()
handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
@@
-101,7
+104,7
@@
public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
(a, b) -> a.addAll(b));
for (int shard = 0; shard < numShards; shard++)
{
(a, b) -> a.addAll(b));
for (int shard = 0; shard < numShards; shard++)
{
- chat
room
s[shard] = owned.contains(shard)
+ chat
RoomMap
s[shard] = owned.contains(shard)
? new HashMap<>()
: null;
}
? new HashMap<>()
: null;
}
@@
-119,25
+122,25
@@
public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
}
})
.toStream()
}
})
.toStream()
- .forEach(chatroom -> chat
room
s[chatroom.getShard()].put(chatroom.getId(), chatroom));
+ .forEach(chatroom -> chat
RoomMap
s[chatroom.getShard()].put(chatroom.getId(), chatroom));
}
@Override
public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
{
}
@Override
public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
{
- chat
room
s[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+ chat
RoomMap
s[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
return Mono.just(chatRoom);
}
@Override
public Mono<ChatRoom> getChatRoom(int shard, UUID id)
{
return Mono.just(chatRoom);
}
@Override
public Mono<ChatRoom> getChatRoom(int shard, UUID id)
{
- return Mono.justOrEmpty(chat
room
s[shard].get(id));
+ return Mono.justOrEmpty(chat
RoomMap
s[shard].get(id));
}
@Override
public Flux<ChatRoom> getChatRooms(int shard)
{
}
@Override
public Flux<ChatRoom> getChatRooms(int shard)
{
- return Flux.fromStream(chat
room
s[shard].values().stream());
+ return Flux.fromStream(chat
RoomMap
s[shard].values().stream());
}
}
}
}