WIP:PartitionState -- ALIGN consumer/spring-consumer--log-compaction--refactoring
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 16:27:08 +0000 (17:27 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 16:27:08 +0000 (17:27 +0100)
src/main/java/de/juplo/kafka/PartitionState.java

index fa9bba9..92df759 100644 (file)
@@ -43,104 +43,27 @@ public class PartitionState implements Runnable, ConsumerRebalanceListener
     String clientId,
     String topic,
     Consumer<String, String> consumer,
-    String stateTopic,
-    Producer<String, String> producer)
+    String stateTopic)
   {
     this.id = clientId;
     this.topic = topic;
     this.consumer = consumer;
     this.stateTopic = stateTopic;
-    this.producer = producer;
 
-    workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
-    workerThread.start();
-  }
-
-
-  @Override
-  public void run()
-  {
-    try
-    {
-      log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
-      int numPartitions = consumer.partitionsFor(topic).size();
-      log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
-      partitionStates = new State[numPartitions];
-      for (int i=0; i<numPartitions; i++)
-      {
-        partitionStates[i] = State.UNASSIGNED;
-      }
-      restoredState = new Map[numPartitions];
-      counterState = new CounterState[numPartitions];
-      stateEndOffsets = new long[numPartitions];
-      seen = new int[numPartitions];
-      acked = new int[numPartitions];
-      done = new boolean[numPartitions];
-
-      log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic, stateTopic), this);
-      running = true;
-
-      while (running)
-      {
-        ConsumerRecords<String, String> records =
-            consumer.poll(Duration.ofSeconds(1));
-
-        int phase = phaser.getPhase();
-
-        assignedPartitions
-          .forEach(partition ->
-          {
-            seen[partition.partition()] = 0;
-            acked[partition.partition()] = 0;
-            done[partition.partition()] = false;
-          });
-
-        log.info("{} - Received {} messages in phase {}", id, records.count(), phase);
-        records
-          .partitions()
-          .forEach(partition ->
-          {
-            for (ConsumerRecord<String, String> record : records.records(partition))
-            {
-              handleRecord(
-                record.topic(),
-                record.partition(),
-                record.offset(),
-                record.key(),
-                record.value());
-            }
-
-            done[partition.partition()] = true;
-          });
-
-        assignedPartitions
-          .forEach(partition ->
-          {
-            if (seen[partition.partition()] == 0)
-            {
-              int arrivedPhase = phaser.arrive();
-              log.debug("{} - Received no records for partition {} in phase {}", id, partition, arrivedPhase);
-            }
-          });
-
-        int arrivedPhase = phaser.arriveAndAwaitAdvance();
-        log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase);
-      }
-    }
-    catch(WakeupException e)
+    log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
+    int numPartitions = consumer.partitionsFor(topic).size();
+    log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+    partitionStates = new State[numPartitions];
+    for (int i=0; i<numPartitions; i++)
     {
-      log.info("{} - Consumer was signaled to finish its work", id);
-    }
-    catch(Exception e)
-    {
-      log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString());
-      consumer.unsubscribe();
-    }
-    finally
-    {
-      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+      partitionStates[i] = State.UNASSIGNED;
     }
+    restoredState = new Map[numPartitions];
+    counterState = new CounterState[numPartitions];
+    stateEndOffsets = new long[numPartitions];
+    seen = new int[numPartitions];
+    acked = new int[numPartitions];
+    done = new boolean[numPartitions];
   }
 
   private void handleRecord(
@@ -163,7 +86,7 @@ public class PartitionState implements Runnable, ConsumerRebalanceListener
     }
   }
 
-  private synchronized void handleState(
+  synchronized void restoreState(
     int partition,
     long offset,
     String key,
@@ -187,153 +110,18 @@ public class PartitionState implements Runnable, ConsumerRebalanceListener
     }
   }
 
-  private void handleMessage(
-    Integer partition,
-    String key)
-  {
-    Long counter = computeCount(partition, key);
-    log.info("{} - current value for counter {}: {}", id, key, counter);
-    sendCounterState(partition, key, counter);
-  }
-
-  private synchronized Long computeCount(int partition, String key)
+  synchronized Long addToCounter(int partition, String key)
   {
     return counterState[partition].addToCounter(key);
   }
 
-  public Map<Integer, Map<String, Long>> getCounterState()
+  Map<Integer, Map<String, Long>> getCounterState()
   {
     Map<Integer, Map<String, Long>> result = new HashMap<>(assignedPartitions.size());
     assignedPartitions.forEach(tp -> result.put(tp.partition(), counterState[tp.partition()].getCounterState()));
     return result;
   }
 
-  void sendCounterState(int partition, String key, Long counter)
-  {
-    seen[partition]++;
-
-    final long time = System.currentTimeMillis();
-
-    final ProducerRecord<String, String> record = new ProducerRecord<>(
-        stateTopic,        // Topic
-        key,               // Key
-        counter.toString() // Value
-    );
-
-    producer.send(record, (metadata, e) ->
-    {
-      long now = System.currentTimeMillis();
-      if (e == null)
-      {
-        // HANDLE SUCCESS
-        log.debug(
-            "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms",
-            id,
-            record.key(),
-            record.value(),
-            metadata.partition(),
-            metadata.offset(),
-            metadata.timestamp(),
-            now - time
-        );
-      }
-      else
-      {
-        // HANDLE ERROR
-        log.error(
-            "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}",
-            id,
-            record.key(),
-            record.value(),
-            metadata == null ? -1 : metadata.timestamp(),
-            now - time,
-            e.toString()
-        );
-      }
-
-      acked[partition]++;
-      if (done[partition] && !(acked[partition] < seen[partition]))
-      {
-        int arrivedPhase = phaser.arrive();
-        log.debug(
-            "{} - Arrived at phase {} for partition {}, seen={}, acked={}",
-            id,
-            arrivedPhase,
-            partition,
-            seen[partition],
-            acked[partition]);
-      }
-      else
-      {
-        log.debug(
-            "{} - Still in phase {} for partition {}, seen={}, acked={}",
-            id,
-            phaser.getPhase(),
-            partition,
-            seen[partition],
-            acked[partition]);
-      }
-    });
-
-    long now = System.currentTimeMillis();
-    log.trace(
-        "{} - Queued message {}={}, latency={}ms",
-        id,
-        record.key(),
-        record.value(),
-        now - time
-    );
-  }
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    partitions
-      .stream()
-      .filter(partition -> partition.topic().equals(topic))
-      .forEach(partition -> restoreAndAssign(partition.partition()));
-  }
-
-  @Override
-  public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions
-      .stream()
-      .filter(partition -> partition.topic().equals(topic))
-      .forEach(partition -> revoke(partition.partition()));
-  }
-
-  private void restoreAndAssign(int partition)
-  {
-    TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
-
-    long stateEndOffset = consumer
-      .endOffsets(List.of(statePartition))
-      .get(statePartition)
-      .longValue();
-
-    long stateBeginningOffset = consumer
-      .beginningOffsets(List.of(statePartition))
-      .get(statePartition);
-
-    log.info(
-      "{} - Found beginning-offset {} and end-offset {} for state partition {}",
-      id,
-      stateBeginningOffset,
-      stateEndOffset,
-      partition);
-
-    if (stateBeginningOffset < stateEndOffset)
-    {
-      stateRestoring(partition, stateBeginningOffset, stateEndOffset);
-    }
-    else
-    {
-      log.info("{} - No state available for partition {}", id, partition);
-      restoredState[partition] = new HashMap<>();
-      stateAssigned(partition);
-    }
-  }
 
   private void revoke(int partition)
   {