log.info("{} - Received {} messages", id, records.count());
for (ConsumerRecord<K, V> record : records)
{
- handleRecord(
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value());
+ try
+ {
+ handleRecord(
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.key(),
+ record.value());
+ }
+ catch (NumberFormatException e)
+ {
+ log.error(
+ "{} - Ignoring invalid message for offset {} on partition {}: {}",
+ id,
+ record.offset(),
+ record.partition(),
+ record.value());
+ }
}
}
}
Integer partition,
Long offset,
K key,
- V value)
+ V message)
{
+ long value = Long.parseLong(message.toString());
consumed++;
log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
}