-
- Integer partition = record.partition();
- String key = record.key() == null ? "NULL" : record.key().toString();
- Map<String, Long> byKey = seen.get(partition);
-
- if (!byKey.containsKey(key))
- byKey.put(key, 0l);
-
- long seenByKey = byKey.get(key);
- seenByKey++;
- byKey.put(key, seenByKey);
- }
-
- if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
- {
- log.debug("Storing data and offsets, last commit: {}", lastCommit);
- seen.forEach((partiton, statistics) -> repository.save(
- new StatisticsDocument(
- partiton,
- statistics,
- consumer.position(new TopicPartition(topic, partiton)))));
- lastCommit = clock.instant();
- }
- }
- }
- catch(WakeupException e)
- {
- log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
- shutdown();
- }
- catch(RecordDeserializationException e)
- {
- TopicPartition tp = e.topicPartition();
- long offset = e.offset();
- log.error(
- "{} - Could not deserialize message on topic {} with offset={}: {}",
- id,
- tp,
- offset,
- e.getCause().toString());
-
- shutdown(e);
- }
- catch(Exception e)
- {
- log.error("{} - Unexpected error: {}", id, e.toString(), e);
- shutdown(e);
- }
- finally
- {
- log.info("{} - Consumer-Thread exiting", id);
- }