projects
/
demos
/
kafka
/
chat
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
feat: Introduced interface `ShardingPublisherStrategy`
[demos/kafka/chat]
/
src
/
main
/
java
/
de
/
juplo
/
kafka
/
chat
/
backend
/
implementation
/
kafka
/
DataChannel.java
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
index
381152b
..
2fa4998
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
@@
-19,13
+19,13
@@
import reactor.core.publisher.Mono;
import java.time.*;
import java.util.*;
import java.time.*;
import java.util.*;
-import java.util.function.Function;
import java.util.stream.IntStream;
@Slf4j
public class DataChannel implements Runnable, ConsumerRebalanceListener
{
import java.util.stream.IntStream;
@Slf4j
public class DataChannel implements Runnable, ConsumerRebalanceListener
{
+ private final String instanceId;
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
private final Consumer<String, AbstractMessageTo> consumer;
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
private final Consumer<String, AbstractMessageTo> consumer;
@@
-38,6
+38,7
@@
public class DataChannel implements Runnable, ConsumerRebalanceListener
private final long[] nextOffset;
private final Map<UUID, ChatRoomData>[] chatRoomData;
private final InfoChannel infoChannel;
private final long[] nextOffset;
private final Map<UUID, ChatRoomData>[] chatRoomData;
private final InfoChannel infoChannel;
+ private final ShardingPublisherStrategy shardingPublisherStrategy;
private boolean running;
@Getter
private boolean running;
@Getter
@@
-45,6
+46,7
@@
public class DataChannel implements Runnable, ConsumerRebalanceListener
public DataChannel(
public DataChannel(
+ String instanceId,
String topic,
Producer<String, AbstractMessageTo> producer,
Consumer<String, AbstractMessageTo> dataChannelConsumer,
String topic,
Producer<String, AbstractMessageTo> producer,
Consumer<String, AbstractMessageTo> dataChannelConsumer,
@@
-52,12
+54,15
@@
public class DataChannel implements Runnable, ConsumerRebalanceListener
int numShards,
int bufferSize,
Clock clock,
int numShards,
int bufferSize,
Clock clock,
- InfoChannel infoChannel)
+ InfoChannel infoChannel,
+ ShardingPublisherStrategy shardingPublisherStrategy)
{
log.debug(
{
log.debug(
- "Creating DataChannel for topic {} with {} partitions",
+ "{}: Creating DataChannel for topic {} with {} partitions",
+ instanceId,
topic,
numShards);
topic,
numShards);
+ this.instanceId = instanceId;
this.topic = topic;
this.consumer = dataChannelConsumer;
this.producer = producer;
this.topic = topic;
this.consumer = dataChannelConsumer;
this.producer = producer;
@@
-73,6
+78,7
@@
public class DataChannel implements Runnable, ConsumerRebalanceListener
.range(0, numShards)
.forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
this.infoChannel = infoChannel;
.range(0, numShards)
.forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
this.infoChannel = infoChannel;
+ this.shardingPublisherStrategy = shardingPublisherStrategy;
}
}
@@
-138,6
+144,14
@@
public class DataChannel implements Runnable, ConsumerRebalanceListener
currentOffset);
consumer.seek(topicPartition, nextOffset[partition]);
currentOffset);
consumer.seek(topicPartition, nextOffset[partition]);
+ infoChannel.sendShardAssignedEvent(partition);
+ shardingPublisherStrategy
+ .publishOwnership(partition)
+ .doOnNext(instanceId -> log.info(
+ "Instance {} was published as owner of shard {}",
+ instanceId,
+ partition))
+ .subscribe();
});
consumer.resume(partitions);
});
consumer.resume(partitions);
@@
-151,6
+165,7
@@
public class DataChannel implements Runnable, ConsumerRebalanceListener
int partition = topicPartition.partition();
isShardOwned[partition] = false;
log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
int partition = topicPartition.partition();
isShardOwned[partition] = false;
log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+ infoChannel.sendShardRevokedEvent(partition);
});
}
});
}
@@
-289,7
+304,7
@@
public class DataChannel implements Runnable, ConsumerRebalanceListener
if (!isShardOwned[shard])
{
if (!isShardOwned[shard])
{
- return Mono.error(new ShardNotOwnedException(shard));
+ return Mono.error(new ShardNotOwnedException(
instanceId,
shard));
}
return infoChannel
}
return infoChannel