WIP:compiles!
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.clients.producer.Producer;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12
13 import java.time.ZoneId;
14 import java.util.*;
15
16
17 @Slf4j
18 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
19 {
20   private final Consumer<String, MessageTo> consumer;
21   private final Producer<String, MessageTo> producer;
22   private final String topic;
23   private final ZoneId zoneId;
24   // private final long[] offsets; Erst mal immer alles neu einlesen
25   private final ChatHomeLoader[] chatHomeLoaders;
26   private final Map<UUID, ChatRoom>[] chatRoomMaps;
27
28
29   public KafkaChatHomeService(
30     Consumer<String, MessageTo> consumer,
31     Producer<String, MessageTo> producer,
32     String topic,
33     ZoneId zoneId,
34     int numShards)
35   {
36     log.debug("Creating KafkaChatHomeService");
37     this.consumer = consumer;
38     this.producer = producer;
39     this.topic = topic;
40     this.zoneId = zoneId;
41     // this.offsets = new long[numShards];
42     // for (int i=0; i< numShards; i++)
43     // {
44     //   this.offsets[i] = 0l;
45     // }
46     this.chatHomeLoaders = new ChatHomeLoader[numShards];
47     this.chatRoomMaps = new Map[numShards];
48   }
49
50
51   @Override
52   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
53   {
54     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
55     {
56       if (!topicPartition.topic().equals(topic))
57       {
58         log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
59         return;
60       }
61
62       int partition = topicPartition.partition();
63       long unseenOffset = 0; // offsets[partition];
64
65       log.info(
66           "Loading messages from partition {}: start-offset={} -> current-offset={}",
67           partition,
68           unseenOffset,
69           currentOffset);
70
71       // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
72       consumer.seek(topicPartition, unseenOffset);
73       chatHomeLoaders[partition] = new ChatHomeLoader(
74           producer,
75           currentOffset,
76           zoneId);
77     });
78   }
79
80   @Override
81   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
82   {
83     partitions.forEach(topicPartition ->
84     {
85       if (!topicPartition.topic().equals(topic))
86       {
87         log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
88         return;
89       }
90
91       int partition = topicPartition.partition();
92       // long unseenOffset = offsets[partition]; TODO: Offset merken...?
93     });
94     log.info("Revoked partitions: {}", partitions);
95   }
96
97   @Override
98   public void onPartitionsLost(Collection<TopicPartition> partitions)
99   {
100     // TODO: Muss auf den Verlust anders reagiert werden?
101     onPartitionsRevoked(partitions);
102   }
103
104   @Override
105   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
106   {
107     return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
108   }
109
110   @Override
111   public Flux<ChatRoom> getChatRooms(int shard)
112   {
113     return Flux.fromStream(chatRoomMaps[shard].values().stream());
114   }
115 }