WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.clients.producer.Producer;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12
13 import java.time.ZoneId;
14 import java.util.*;
15
16
17 @Slf4j
18 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
19 {
20   private final Consumer<String, MessageTo> consumer;
21   private final Producer<String, MessageTo> producer;
22   private final String topic;
23   private final ZoneId zoneId;
24   // private final long[] offsets; Erst mal immer alles neu einlesen
25   private final ChatHomeLoader[] chatHomeLoaders;
26   private final Map<UUID, ChatRoom>[] chatRoomMaps;
27
28
29   public KafkaChatHomeService(
30     Consumer<String, MessageTo> consumer,
31     Producer<String, MessageTo> producer,
32     String topic,
33     ZoneId zoneId,
34     int numShards)
35   {
36     log.debug("Creating KafkaChatHomeService");
37     this.consumer = consumer;
38     this.producer = producer;
39     this.topic = topic;
40     this.zoneId = zoneId;
41     // this.offsets = new long[numShards];
42     // for (int i=0; i< numShards; i++)
43     // {
44     //   this.offsets[i] = 0l;
45     // }
46     this.chatHomeLoaders = new ChatHomeLoader[numShards];
47     this.chatRoomMaps = new Map[numShards];
48   }
49
50
51   @Override
52   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
53   {
54     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
55     {
56       if (!topicPartition.topic().equals(topic))
57       {
58         log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
59         return;
60       }
61
62       int partition = topicPartition.partition();
63       long unseenOffset = 0; // offsets[partition];
64
65       log.info(
66           "Loading messages from partition {}: start-offset={} -> current-offset={}",
67           partition,
68           unseenOffset,
69           currentOffset);
70
71       // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
72       consumer.seek(topicPartition, unseenOffset);
73       chatHomeLoaders[partition] = new ChatHomeLoader(
74           producer,
75           currentOffset,
76           zoneId);
77     });
78   }
79
80   @Override
81   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
82   {
83     partitions.forEach(tp ->
84     {
85       if (!tp.topic().equals(topic))
86       {
87         log.warn("Ignoring partition from unwanted topic: {}", tp);
88         return;
89       }
90
91       int partition = tp.partition();
92       long unseenOffset = offsets[partition];
93
94       log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
95     });
96     log.info("Revoked partitions: {}", partitions);
97   }
98
99   @Override
100   public void onPartitionsLost(Collection<TopicPartition> partitions)
101   {
102     log.info("Revoked partitions: {}", partitions);
103   }
104
105   private void foo()
106   {
107     Set<Integer> owned = Arrays
108       .stream(ownedShards)
109       .collect(
110         () -> new HashSet<>(),
111         (set, i) -> set.add(i),
112         (a, b) -> a.addAll(b));
113     for (int shard = 0; shard < numShards; shard++)
114     {
115       chatRoomMaps[shard] = owned.contains(shard)
116         ? new HashMap<>()
117         : null;
118     }
119     chatroomFlux
120       .filter(chatRoom ->
121       {
122         if (owned.contains(chatRoom.getShard()))
123         {
124           return true;
125         }
126         else
127         {
128           log.info("Ignoring not owned chat-room {}", chatRoom);
129           return false;
130         }
131       })
132       .toStream()
133       .forEach(chatroom -> chatRoomMaps[chatroom.getShard()].put(chatroom.getId(), chatroom));
134   }
135
136   @Override
137   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
138   {
139     chatRoomMaps[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
140     return Mono.just(chatRoom);
141   }
142
143   @Override
144   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
145   {
146     return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
147   }
148
149   @Override
150   public Flux<ChatRoom> getChatRooms(int shard)
151   {
152     return Flux.fromStream(chatRoomMaps[shard].values().stream());
153   }
154 }