import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
import java.time.Instant;
import java.time.LocalDateTime;
@Slf4j
class ChatHomeLoader
{
+ private final Producer<String, MessageTo> producer;
private final long offsetOfFirstUnseenMessage;
private final ZoneId zoneId;
+ @Getter
private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
*/
boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
{
+ TopicPartition topicPartition =
+ new TopicPartition(record.topic(), record.partition());
Message.MessageKey messageKey = Message.MessageKey.of(
record.value().getUser(),
record.value().getId());
{
// All messages consumed: DONE!
log.trace(
- "Ignoring unseen message {}: topic={}, partition={}, offset={}",
+ "Ignoring unseen message {} on {}, offset={}",
messageKey,
- record.topic(),
- record.partition(),
+ topicPartition,
record.offset());
return true;
}
KafkaChatRoomService service = kafkaChatRoomServiceMap
.computeIfAbsent(record.key(), key ->
- {
- });
+ new KafkaChatRoomService(producer, topicPartition));
service.addMessage(new Message(
messageKey,
record.offset(),
time,
- record.value().getText()
- ));
+ record.value().getText()));
return false;
}