log.info("{} - Received {} messages", id, records.count());
for (ConsumerRecord<String, String> record : records)
{
- handleRecord(
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value());
+ try
+ {
+ handleRecord(
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.key(),
+ record.value());
+ }
+ catch (NumberFormatException e)
+ {
+ log.error(
+ "{} - Ignoring invalid message for offset {} on partition {}: {}",
+ id,
+ record.offset(),
+ record.partition(),
+ record.value());
+ }
}
}
}
String value)
{
consumed++;
- log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
+ long message = Long.parseLong(value);
+ log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, message);
}