-
- Mono.error(() -> new MessageMutationException(existing, text)));
- Message message = new Message(key, (long)messages.size(), timestamp, text);
- messages.put(message.getKey(), message);
- return message;
+ return strategy.persistMessage(key, timestamp, text);
+ }
+
+ synchronized protected void addMessage(Message message) throws MessageMutationException
+ {
+ Message existingMessage = messages.get(message.getKey());
+
+ if (existingMessage == null)
+ {
+ messages.put(existingMessage.getKey(), existingMessage);
+ }
+ else
+ {
+ if (!existingMessage.getMessageText().equals(message.getMessageText()))
+ {
+ throw new MessageMutationException(existingMessage, message.getMessageText());
+ }
+
+ // Warn and emit existing message
+ log.warn(
+ "Keeping existing message with {}@{} for {}",
+ existingMessage.getSerialNumber(),
+ existingMessage.getTimestamp(),
+ existingMessage.getKey());
+ }