String clientId,
String topic,
Consumer<String, String> consumer,
- String stateTopic,
- Producer<String, String> producer)
+ String stateTopic)
{
this.id = clientId;
this.topic = topic;
this.consumer = consumer;
this.stateTopic = stateTopic;
- this.producer = producer;
- workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
- workerThread.start();
- }
-
-
- @Override
- public void run()
- {
- try
- {
- log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
- int numPartitions = consumer.partitionsFor(topic).size();
- log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
- partitionStates = new State[numPartitions];
- for (int i=0; i<numPartitions; i++)
- {
- partitionStates[i] = State.UNASSIGNED;
- }
- restoredState = new Map[numPartitions];
- counterState = new CounterState[numPartitions];
- stateEndOffsets = new long[numPartitions];
- seen = new int[numPartitions];
- acked = new int[numPartitions];
- done = new boolean[numPartitions];
-
- log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic, stateTopic), this);
- running = true;
-
- while (running)
- {
- ConsumerRecords<String, String> records =
- consumer.poll(Duration.ofSeconds(1));
-
- int phase = phaser.getPhase();
-
- assignedPartitions
- .forEach(partition ->
- {
- seen[partition.partition()] = 0;
- acked[partition.partition()] = 0;
- done[partition.partition()] = false;
- });
-
- log.info("{} - Received {} messages in phase {}", id, records.count(), phase);
- records
- .partitions()
- .forEach(partition ->
- {
- for (ConsumerRecord<String, String> record : records.records(partition))
- {
- handleRecord(
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value());
- }
-
- done[partition.partition()] = true;
- });
-
- assignedPartitions
- .forEach(partition ->
- {
- if (seen[partition.partition()] == 0)
- {
- int arrivedPhase = phaser.arrive();
- log.debug("{} - Received no records for partition {} in phase {}", id, partition, arrivedPhase);
- }
- });
-
- int arrivedPhase = phaser.arriveAndAwaitAdvance();
- log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase);
- }
- }
- catch(WakeupException e)
+ log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
+ int numPartitions = consumer.partitionsFor(topic).size();
+ log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+ partitionStates = new State[numPartitions];
+ for (int i=0; i<numPartitions; i++)
{
- log.info("{} - Consumer was signaled to finish its work", id);
- }
- catch(Exception e)
- {
- log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString());
- consumer.unsubscribe();
- }
- finally
- {
- log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+ partitionStates[i] = State.UNASSIGNED;
}
+ restoredState = new Map[numPartitions];
+ counterState = new CounterState[numPartitions];
+ stateEndOffsets = new long[numPartitions];
+ seen = new int[numPartitions];
+ acked = new int[numPartitions];
+ done = new boolean[numPartitions];
}
private void handleRecord(
}
}
- private synchronized void handleState(
+ synchronized void restoreState(
int partition,
long offset,
String key,
}
}
- private void handleMessage(
- Integer partition,
- String key)
- {
- Long counter = computeCount(partition, key);
- log.info("{} - current value for counter {}: {}", id, key, counter);
- sendCounterState(partition, key, counter);
- }
-
- private synchronized Long computeCount(int partition, String key)
+ synchronized Long addToCounter(int partition, String key)
{
return counterState[partition].addToCounter(key);
}
- public Map<Integer, Map<String, Long>> getCounterState()
+ Map<Integer, Map<String, Long>> getCounterState()
{
Map<Integer, Map<String, Long>> result = new HashMap<>(assignedPartitions.size());
assignedPartitions.forEach(tp -> result.put(tp.partition(), counterState[tp.partition()].getCounterState()));
return result;
}
- void sendCounterState(int partition, String key, Long counter)
- {
- seen[partition]++;
-
- final long time = System.currentTimeMillis();
-
- final ProducerRecord<String, String> record = new ProducerRecord<>(
- stateTopic, // Topic
- key, // Key
- counter.toString() // Value
- );
-
- producer.send(record, (metadata, e) ->
- {
- long now = System.currentTimeMillis();
- if (e == null)
- {
- // HANDLE SUCCESS
- log.debug(
- "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms",
- id,
- record.key(),
- record.value(),
- metadata.partition(),
- metadata.offset(),
- metadata.timestamp(),
- now - time
- );
- }
- else
- {
- // HANDLE ERROR
- log.error(
- "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}",
- id,
- record.key(),
- record.value(),
- metadata == null ? -1 : metadata.timestamp(),
- now - time,
- e.toString()
- );
- }
-
- acked[partition]++;
- if (done[partition] && !(acked[partition] < seen[partition]))
- {
- int arrivedPhase = phaser.arrive();
- log.debug(
- "{} - Arrived at phase {} for partition {}, seen={}, acked={}",
- id,
- arrivedPhase,
- partition,
- seen[partition],
- acked[partition]);
- }
- else
- {
- log.debug(
- "{} - Still in phase {} for partition {}, seen={}, acked={}",
- id,
- phaser.getPhase(),
- partition,
- seen[partition],
- acked[partition]);
- }
- });
-
- long now = System.currentTimeMillis();
- log.trace(
- "{} - Queued message {}={}, latency={}ms",
- id,
- record.key(),
- record.value(),
- now - time
- );
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- partitions
- .stream()
- .filter(partition -> partition.topic().equals(topic))
- .forEach(partition -> restoreAndAssign(partition.partition()));
- }
-
- @Override
- public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- partitions
- .stream()
- .filter(partition -> partition.topic().equals(topic))
- .forEach(partition -> revoke(partition.partition()));
- }
-
- private void restoreAndAssign(int partition)
- {
- TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
-
- long stateEndOffset = consumer
- .endOffsets(List.of(statePartition))
- .get(statePartition)
- .longValue();
-
- long stateBeginningOffset = consumer
- .beginningOffsets(List.of(statePartition))
- .get(statePartition);
-
- log.info(
- "{} - Found beginning-offset {} and end-offset {} for state partition {}",
- id,
- stateBeginningOffset,
- stateEndOffset,
- partition);
-
- if (stateBeginningOffset < stateEndOffset)
- {
- stateRestoring(partition, stateBeginningOffset, stateEndOffset);
- }
- else
- {
- log.info("{} - No state available for partition {}", id, partition);
- restoredState[partition] = new HashMap<>();
- stateAssigned(partition);
- }
- }
private void revoke(int partition)
{