- if (metadata != null)
- {
- // On successful send
- Message message = messages.get(key);
- if (message != null)
- {
- if (message.getMessageText().equals(text))
- {
- // Warn and emit existing message
- log.warn(
- "Keeping existing message with {}@{} for {}",
- message.getSerialNumber(),
- message.getTimestamp(), key);
- }
- else
- {
- // Emit error and abort
- sink.error(new MessageMutationException(message, text));
- return;
- }
- }
- else
- {
- // Emit new message
- message = new Message(key, metadata.offset(), timestamp, text);
- messages.put(message.getKey(), message);
- }