From 0f0c17dbfed140e3256015bfc5bc30760a5fd291 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 17:27:08 +0100 Subject: [PATCH] WIP:PartitionState -- ALIGN --- .../java/de/juplo/kafka/PartitionState.java | 244 ++---------------- 1 file changed, 16 insertions(+), 228 deletions(-) diff --git a/src/main/java/de/juplo/kafka/PartitionState.java b/src/main/java/de/juplo/kafka/PartitionState.java index fa9bba9..92df759 100644 --- a/src/main/java/de/juplo/kafka/PartitionState.java +++ b/src/main/java/de/juplo/kafka/PartitionState.java @@ -43,104 +43,27 @@ public class PartitionState implements Runnable, ConsumerRebalanceListener String clientId, String topic, Consumer consumer, - String stateTopic, - Producer producer) + String stateTopic) { this.id = clientId; this.topic = topic; this.consumer = consumer; this.stateTopic = stateTopic; - this.producer = producer; - workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); - workerThread.start(); - } - - - @Override - public void run() - { - try - { - log.info("{} - Fetching PartitionInfo for topic {}", id, topic); - int numPartitions = consumer.partitionsFor(topic).size(); - log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); - partitionStates = new State[numPartitions]; - for (int i=0; i records = - consumer.poll(Duration.ofSeconds(1)); - - int phase = phaser.getPhase(); - - assignedPartitions - .forEach(partition -> - { - seen[partition.partition()] = 0; - acked[partition.partition()] = 0; - done[partition.partition()] = false; - }); - - log.info("{} - Received {} messages in phase {}", id, records.count(), phase); - records - .partitions() - .forEach(partition -> - { - for (ConsumerRecord record : records.records(partition)) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - - done[partition.partition()] = true; - }); - - assignedPartitions - .forEach(partition -> - { - if (seen[partition.partition()] == 0) - { - int arrivedPhase = phaser.arrive(); - log.debug("{} - Received no records for partition {} in phase {}", id, partition, arrivedPhase); - } - }); - - int arrivedPhase = phaser.arriveAndAwaitAdvance(); - log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase); - } - } - catch(WakeupException e) + log.info("{} - Fetching PartitionInfo for topic {}", id, topic); + int numPartitions = consumer.partitionsFor(topic).size(); + log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); + partitionStates = new State[numPartitions]; + for (int i=0; i> getCounterState() + Map> getCounterState() { Map> result = new HashMap<>(assignedPartitions.size()); assignedPartitions.forEach(tp -> result.put(tp.partition(), counterState[tp.partition()].getCounterState())); return result; } - void sendCounterState(int partition, String key, Long counter) - { - seen[partition]++; - - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - stateTopic, // Topic - key, // Key - counter.toString() // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - log.debug( - "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}", - id, - record.key(), - record.value(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - - acked[partition]++; - if (done[partition] && !(acked[partition] < seen[partition])) - { - int arrivedPhase = phaser.arrive(); - log.debug( - "{} - Arrived at phase {} for partition {}, seen={}, acked={}", - id, - arrivedPhase, - partition, - seen[partition], - acked[partition]); - } - else - { - log.debug( - "{} - Still in phase {} for partition {}, seen={}, acked={}", - id, - phaser.getPhase(), - partition, - seen[partition], - acked[partition]); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued message {}={}, latency={}ms", - id, - record.key(), - record.value(), - now - time - ); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions - .stream() - .filter(partition -> partition.topic().equals(topic)) - .forEach(partition -> restoreAndAssign(partition.partition())); - } - - @Override - public synchronized void onPartitionsRevoked(Collection partitions) - { - partitions - .stream() - .filter(partition -> partition.topic().equals(topic)) - .forEach(partition -> revoke(partition.partition())); - } - - private void restoreAndAssign(int partition) - { - TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); - - long stateEndOffset = consumer - .endOffsets(List.of(statePartition)) - .get(statePartition) - .longValue(); - - long stateBeginningOffset = consumer - .beginningOffsets(List.of(statePartition)) - .get(statePartition); - - log.info( - "{} - Found beginning-offset {} and end-offset {} for state partition {}", - id, - stateBeginningOffset, - stateEndOffset, - partition); - - if (stateBeginningOffset < stateEndOffset) - { - stateRestoring(partition, stateBeginningOffset, stateEndOffset); - } - else - { - log.info("{} - No state available for partition {}", id, partition); - restoredState[partition] = new HashMap<>(); - stateAssigned(partition); - } - } private void revoke(int partition) { -- 2.20.1