projects
/
demos
/
kafka
/
chat
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
feat: Introduced interface `ShardingPublisherStrategy`
[demos/kafka/chat]
/
src
/
main
/
java
/
de
/
juplo
/
kafka
/
chat
/
backend
/
implementation
/
kafka
/
DataChannel.java
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
index
9cafbaa
..
2fa4998
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
@@
-38,6
+38,7
@@
public class DataChannel implements Runnable, ConsumerRebalanceListener
private final long[] nextOffset;
private final Map<UUID, ChatRoomData>[] chatRoomData;
private final InfoChannel infoChannel;
private final long[] nextOffset;
private final Map<UUID, ChatRoomData>[] chatRoomData;
private final InfoChannel infoChannel;
+ private final ShardingPublisherStrategy shardingPublisherStrategy;
private boolean running;
@Getter
private boolean running;
@Getter
@@
-53,7
+54,8
@@
public class DataChannel implements Runnable, ConsumerRebalanceListener
int numShards,
int bufferSize,
Clock clock,
int numShards,
int bufferSize,
Clock clock,
- InfoChannel infoChannel)
+ InfoChannel infoChannel,
+ ShardingPublisherStrategy shardingPublisherStrategy)
{
log.debug(
"{}: Creating DataChannel for topic {} with {} partitions",
{
log.debug(
"{}: Creating DataChannel for topic {} with {} partitions",
@@
-76,6
+78,7
@@
public class DataChannel implements Runnable, ConsumerRebalanceListener
.range(0, numShards)
.forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
this.infoChannel = infoChannel;
.range(0, numShards)
.forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
this.infoChannel = infoChannel;
+ this.shardingPublisherStrategy = shardingPublisherStrategy;
}
}
@@
-142,6
+145,13
@@
public class DataChannel implements Runnable, ConsumerRebalanceListener
consumer.seek(topicPartition, nextOffset[partition]);
infoChannel.sendShardAssignedEvent(partition);
consumer.seek(topicPartition, nextOffset[partition]);
infoChannel.sendShardAssignedEvent(partition);
+ shardingPublisherStrategy
+ .publishOwnership(partition)
+ .doOnNext(instanceId -> log.info(
+ "Instance {} was published as owner of shard {}",
+ instanceId,
+ partition))
+ .subscribe();
});
consumer.resume(partitions);
});
consumer.resume(partitions);