- return Mono.create(sink ->
- {
- ProducerRecord<String, MessageTo> record =
- new ProducerRecord<>(
- tp.topic(),
- tp.partition(),
- timestamp.toEpochSecond(zoneOffset),
- chatRoomId.toString(),
- MessageTo.of(key.getUsername(), key.getMessageId(), text));
-
- producer.send(record, ((metadata, exception) ->
- {
- if (metadata != null)
- {
- // On successful send
- Message message = messages.get(key);
- if (message != null)
- {
- if (message.getMessageText().equals(text))
- {
- // Warn and emit existing message
- log.warn(
- "Keeping existing message with {}@{} for {}",
- message.getSerialNumber(),
- message.getTimestamp(), key);
- }
- else
- {
- // Emit error and abort
- sink.error(new MessageMutationException(message, text));
- return;
- }
- }
- else
- {
- // Emit new message
- message = new Message(key, metadata.offset(), timestamp, text);
- messages.put(message.getKey(), message);
- }
+ return chatMessageChannel
+ .sendMessage(chatRoomId, key, timestamp, text)
+ .doOnSuccess(message -> persistMessage(message));
+ }