import reactor.core.publisher.Mono;
import java.time.*;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import java.util.stream.IntStream;
@Override
public void run()
{
- consumer.subscribe(List.of(topic));
+ consumer.subscribe(List.of(topic), this);
running = true;
running = false;
}
}
+
+ log.info("Exiting normally");
}
void loadMessages(ConsumerRecords<String, MessageTo> records)
return IntStream
.range(0, numShards)
.filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard])
- .collect(
- () -> Boolean.TRUE, // TODO: Boolean is immutable
- (acc, v) -> Boolean.valueOf(acc && v), // TODO: Boolean is immutable
- (a, b) -> Boolean.valueOf(a && b)); // TODO: Boolean is immutable
+ .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
}
void pauseAllOwnedPartions()