log.info("{} - Received {} messages", id, records.count());
for (ConsumerRecord<String, String> record : records)
{
- handleRecord(
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value());
+ try
+ {
+ handleRecord(
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.key(),
+ record.value());
+ }
+ catch (NumberFormatException e)
+ {
+ log.error(
+ "{} - Ignoring invalid message for offset {} on partition {}: {}",
+ id,
+ record.offset(),
+ record.partition(),
+ record.value());
+ }
}
}
}
Integer partition,
Long offset,
String key,
- String value)
+ String message)
{
+ long value = Long.parseLong(message);
consumed++;
log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
}