refactor: One stream -> using `flatMap` instead of an inner `subscribe`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / ShardedChatHomeService.java
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
5 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
6 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
10
11 import java.net.URI;
12 import java.util.Arrays;
13 import java.util.HashSet;
14 import java.util.Set;
15 import java.util.UUID;
16 import java.util.stream.Collectors;
17
18
19 @Slf4j
20 public class ShardedChatHomeService implements ChatHomeService
21 {
22   private final String instanceId;
23   private final SimpleChatHomeService[] chatHomes;
24   private final Set<Integer> ownedShards;
25   private final String[] shardOwners;
26   private final ShardingStrategy shardingStrategy;
27
28
29   public ShardedChatHomeService(
30       String instanceId,
31       SimpleChatHomeService[] chatHomes,
32       URI[] shardOwners,
33       ShardingStrategy shardingStrategy)
34   {
35     this.instanceId = instanceId;
36     this.chatHomes = chatHomes;
37     this.shardOwners = Arrays
38         .stream(shardOwners)
39         .map(uri -> uri.toASCIIString())
40         .toArray(size -> new String[size]);
41     this.shardingStrategy = shardingStrategy;
42     this.ownedShards = new HashSet<>();
43     for (int shard = 0; shard < chatHomes.length; shard++)
44       if(chatHomes[shard] != null)
45         this.ownedShards.add(shard);
46     log.info("Created {}", this);
47   }
48
49
50   @Override
51   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
52   {
53     int shard = shardingStrategy.selectShard(id);
54     return chatHomes[shard] == null
55         ? Mono.error(new ShardNotOwnedException(instanceId, shard))
56         : chatHomes[shard].createChatRoom(id, name);
57   }
58
59   @Override
60   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
61   {
62     int shard = selectShard(id);
63     return chatHomes[shard] == null
64         ? Mono.error(new ShardNotOwnedException(instanceId, shard))
65         : chatHomes[shard]
66             .getChatRoomInfo(id)
67             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
68             ? new UnknownChatroomException(
69                 id,
70                 shard,
71                 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
72             : throwable);
73   }
74
75   @Override
76   public Flux<ChatRoomInfo> getChatRoomInfo()
77   {
78     return Flux
79         .fromIterable(ownedShards)
80         .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
81   }
82
83   @Override
84   public Mono<ChatRoomData> getChatRoomData(UUID id)
85   {
86     int shard = selectShard(id);
87     return chatHomes[shard] == null
88         ? Mono.error(new ShardNotOwnedException(instanceId, shard))
89         : chatHomes[shard]
90             .getChatRoomData(id)
91             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
92                 ? new UnknownChatroomException(
93                 id,
94                 shard,
95                 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
96                 : throwable);
97   }
98
99   @Override
100   public Mono<String[]> getShardOwners()
101   {
102     return Mono.just(shardOwners);
103   }
104
105   private int selectShard(UUID chatroomId)
106   {
107     return shardingStrategy.selectShard(chatroomId);
108   }
109
110   @Override
111   public String toString()
112   {
113     StringBuffer stringBuffer = new StringBuffer(ShardedChatHomeService.class.getSimpleName());
114     stringBuffer.append(", shards=[");
115     stringBuffer.append(ownedShards
116         .stream()
117         .sorted()
118         .map(String::valueOf)
119         .collect(Collectors.joining(",")));
120     stringBuffer.append("]");
121     return stringBuffer.toString();
122   }
123 }