import java.time.*;
import java.util.*;
-import java.util.function.Function;
import java.util.stream.IntStream;
@Slf4j
public class DataChannel implements Runnable, ConsumerRebalanceListener
{
+ private final String instanceId;
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
private final Consumer<String, AbstractMessageTo> consumer;
public DataChannel(
+ String instanceId,
String topic,
Producer<String, AbstractMessageTo> producer,
Consumer<String, AbstractMessageTo> dataChannelConsumer,
InfoChannel infoChannel)
{
log.debug(
- "Creating DataChannel for topic {} with {} partitions",
+ "{}: Creating DataChannel for topic {} with {} partitions",
+ instanceId,
topic,
numShards);
+ this.instanceId = instanceId;
this.topic = topic;
this.consumer = dataChannelConsumer;
this.producer = producer;
if (!isShardOwned[shard])
{
- return Mono.error(new ShardNotOwnedException(shard));
+ return Mono.error(new ShardNotOwnedException(instanceId, shard));
}
return infoChannel