refactor: Moved extracted the `subscribe()`-call from `StorageStrategy`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / storage / files / FilesStorageStrategy.java
1 package de.juplo.kafka.chat.backend.storage.files;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
11 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
12 import lombok.RequiredArgsConstructor;
13 import lombok.extern.slf4j.Slf4j;
14 import reactor.core.publisher.Flux;
15
16 import java.io.IOException;
17 import java.nio.file.Files;
18 import java.nio.file.Path;
19 import java.util.UUID;
20
21 import static java.nio.file.StandardOpenOption.CREATE;
22 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
23
24
25 @RequiredArgsConstructor
26 @Slf4j
27 public class FilesStorageStrategy implements StorageStrategy
28 {
29   public static final String CHATROOMS_FILENAME = "chatrooms.json";
30
31
32   private final Path storagePath;
33   private final ShardingStrategy shardingStrategy;
34   private final ObjectMapper mapper;
35
36
37   @Override
38   public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
39   {
40     Path path = chatroomsPath();
41     log.info("Writing chatrooms to {}", path);
42     try
43     {
44       Files.createDirectories(storagePath);
45
46       JsonGenerator generator =
47           mapper
48               .getFactory()
49               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
50
51       return chatRoomInfoFlux
52           .log()
53           .doFirst(() ->
54           {
55             try
56             {
57               generator.useDefaultPrettyPrinter();
58               generator.writeStartArray();
59             }
60             catch (IOException e)
61             {
62               throw new RuntimeException(e);
63             }
64           })
65           .doOnTerminate(() ->
66           {
67             try
68             {
69               generator.writeEndArray();
70               generator.close();
71             }
72             catch (IOException e)
73             {
74               throw new RuntimeException(e);
75             }
76           })
77           .map(chatRoomInfo ->
78           {
79             try
80             {
81               ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
82               generator.writeObject(chatRoomInfoTo);
83               return chatRoomInfo;
84             }
85             catch (IOException e)
86             {
87               throw new RuntimeException(e);
88             }
89           });
90     }
91     catch (IOException e)
92     {
93       throw new RuntimeException(e);
94     }
95   }
96
97   @Override
98   public Flux<ChatRoomInfo> readChatRoomInfo()
99   {
100     JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
101     return Flux
102         .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
103         .log()
104         .map(chatRoomInfoTo ->
105         {
106           UUID chatRoomId = chatRoomInfoTo.getId();
107           int shard = shardingStrategy.selectShard(chatRoomId);
108
109           log.info(
110               "{} - old shard: {}, new shard:  {}",
111               chatRoomId,
112               chatRoomInfoTo.getShard(),
113               shard);
114
115           return new ChatRoomInfo(
116               chatRoomId,
117               chatRoomInfoTo.getName(),
118               shard);
119         });
120   }
121
122   @Override
123   public Flux<Message> writeChatRoomData(
124       UUID chatRoomId,
125       Flux<Message> messageFlux)
126   {
127     Path path = chatroomPath(chatRoomId);
128     log.info("Writing messages for {} to {}", chatRoomId, path);
129     try
130     {
131       Files.createDirectories(storagePath);
132
133       JsonGenerator generator =
134           mapper
135               .getFactory()
136               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
137
138       return messageFlux
139           .log()
140           .doFirst(() ->
141           {
142             try
143             {
144               generator.useDefaultPrettyPrinter();
145               generator.writeStartArray();
146             }
147             catch (IOException e)
148             {
149               throw new RuntimeException(e);
150             }
151           })
152           .doOnTerminate(() ->
153           {
154             try
155             {
156               generator.writeEndArray();
157               generator.close();
158             }
159             catch (IOException e)
160             {
161               throw new RuntimeException(e);
162             }
163           })
164           .map(message ->
165           {
166             try
167             {
168               MessageTo messageTo = MessageTo.from(message);
169               generator.writeObject(messageTo);
170               return message;
171             }
172             catch (IOException e)
173             {
174               throw new RuntimeException(e);
175             }
176           });
177     }
178     catch (IOException e)
179     {
180       throw new RuntimeException(e);
181     }
182   }
183
184   @Override
185   public Flux<Message> readChatRoomData(UUID chatRoomId)
186   {
187     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
188     return Flux
189         .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
190         .log()
191         .map(MessageTo::toMessage);
192   }
193
194   Path chatroomsPath()
195   {
196     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
197   }
198
199   Path chatroomPath(UUID id)
200   {
201     return storagePath.resolve(Path.of(id.toString() + ".json"));
202   }
203 }