@Override
public void run()
{
+ OffsetRecorder offsetRecorder = new OffsetRecorder(topic, partitions);
+
try
{
- OffsetRecorder offsetRecorder = new OffsetRecorder(topic, partitions);
+ producer.initTransactions();
log.info("{} - Subscribing to topic {}", id, topic);
consumer.subscribe(Arrays.asList(topic), offsetRecorder);
+ producer.beginTransaction();
+
while (true)
{
ConsumerRecords<K, V> records =
producer.sendOffsetsToTransaction(
offsetRecorder.getOffsets(),
consumer.groupMetadata());
+ producer.commitTransaction();
+ producer.beginTransaction();
}
consumed++;
}
catch(WakeupException e)
{
log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
- consumer.commitSync();
+ producer.sendOffsetsToTransaction(
+ offsetRecorder.getOffsets(),
+ consumer.groupMetadata());
+ producer.commitTransaction();
shutdown();
}
catch(RecordDeserializationException e)
offset,
e.getCause().toString());
- consumer.commitSync();
+ producer.sendOffsetsToTransaction(
+ offsetRecorder.getOffsets(),
+ consumer.groupMetadata());
+ producer.commitTransaction();
shutdown(e);
}
catch(Exception e)
{
log.error("{} - Unexpected error: {}", id, e.toString(), e);
+ producer.abortTransaction();
shutdown(e);
}
finally
if (!activePartitions[partition])
throw new IllegalStateException("Partition " + partition + " is not active!");
- offsets[partition] = offset;
+ offsets[partition] = offset + 1;
}
Map<TopicPartition, OffsetAndMetadata> getOffsets()
for (int i=0; i<offsets.length; i++)
{
if (activePartitions[i] && offsets[i] > -1)
+ {
+ log.info("Offset for partition {} is {}", i, offsets[i]);
recordedOffsets.put(
new TopicPartition(topic, i),
new OffsetAndMetadata(offsets[i]));
+ }
}
return recordedOffsets;
() -> new HashMap<TopicPartition, OffsetAndMetadata>(),
(map, topicPartition) ->
{
- log.info("Commiting & deactivating revoked partition {}", topicPartition);
+ log.info("Deactivating revoked partition {}", topicPartition);
activePartitions[topicPartition.partition()] = false;
- map.put(topicPartition, new OffsetAndMetadata(offsets[topicPartition.partition()]));
+ if (offsets[topicPartition.partition()] < 0)
+ {
+ log.info("No offset to commit for partition {}", topicPartition);
+ }
+ else
+ {
+ log.info(
+ "Commiting offset {} for partition {}",
+ offsets[topicPartition.partition()],
+ topicPartition);
+ map.put(topicPartition, new OffsetAndMetadata(offsets[topicPartition.partition()]));
+ }
},
(mapA, mapB) -> mapA.putAll(mapB)),
consumer.groupMetadata());