refactor: DRY for logging-category from io.projectreactor
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / storage / files / FilesStorageStrategy.java
1 package de.juplo.kafka.chat.backend.storage.files;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
11 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
12 import lombok.RequiredArgsConstructor;
13 import lombok.extern.slf4j.Slf4j;
14 import reactor.core.publisher.Flux;
15
16 import java.io.IOException;
17 import java.nio.file.Files;
18 import java.nio.file.Path;
19 import java.util.UUID;
20 import java.util.logging.Level;
21
22 import static java.nio.file.StandardOpenOption.CREATE;
23 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
24
25
26 @RequiredArgsConstructor
27 @Slf4j
28 public class FilesStorageStrategy implements StorageStrategy
29 {
30   public static final String CHATROOMS_FILENAME = "chatrooms.json";
31
32
33   private final Path storagePath;
34   private final ShardingStrategy shardingStrategy;
35   private final ObjectMapper mapper;
36   private final String loggingCategory = FilesStorageStrategy.class.getSimpleName();
37   private final Level loggingLevel;
38   private final boolean showOperatorLine;
39
40
41   @Override
42   public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
43   {
44     Path path = chatroomsPath();
45     log.info("Writing chatrooms to {}", path);
46     try
47     {
48       Files.createDirectories(storagePath);
49
50       JsonGenerator generator =
51           mapper
52               .getFactory()
53               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
54
55       return chatRoomInfoFlux
56           .log(
57               loggingCategory,
58               loggingLevel,
59               showOperatorLine)
60           .doFirst(() ->
61           {
62             try
63             {
64               generator.useDefaultPrettyPrinter();
65               generator.writeStartArray();
66             }
67             catch (IOException e)
68             {
69               throw new RuntimeException(e);
70             }
71           })
72           .doOnTerminate(() ->
73           {
74             try
75             {
76               generator.writeEndArray();
77               generator.close();
78             }
79             catch (IOException e)
80             {
81               throw new RuntimeException(e);
82             }
83           })
84           .map(chatRoomInfo ->
85           {
86             try
87             {
88               ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
89               generator.writeObject(chatRoomInfoTo);
90               return chatRoomInfo;
91             }
92             catch (IOException e)
93             {
94               throw new RuntimeException(e);
95             }
96           });
97     }
98     catch (IOException e)
99     {
100       throw new RuntimeException(e);
101     }
102   }
103
104   @Override
105   public Flux<ChatRoomInfo> readChatRoomInfo()
106   {
107     JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
108     return Flux
109         .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
110         .log(
111             loggingCategory,
112             loggingLevel,
113             showOperatorLine)
114         .map(chatRoomInfoTo ->
115         {
116           UUID chatRoomId = chatRoomInfoTo.getId();
117           int shard = shardingStrategy.selectShard(chatRoomId);
118
119           log.info(
120               "{} - old shard: {}, new shard:  {}",
121               chatRoomId,
122               chatRoomInfoTo.getShard(),
123               shard);
124
125           return new ChatRoomInfo(
126               chatRoomId,
127               chatRoomInfoTo.getName(),
128               shard);
129         });
130   }
131
132   @Override
133   public Flux<Message> writeChatRoomData(
134       UUID chatRoomId,
135       Flux<Message> messageFlux)
136   {
137     Path path = chatroomPath(chatRoomId);
138     log.info("Writing messages for {} to {}", chatRoomId, path);
139     try
140     {
141       Files.createDirectories(storagePath);
142
143       JsonGenerator generator =
144           mapper
145               .getFactory()
146               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
147
148       return messageFlux
149           .log(
150               loggingCategory,
151               loggingLevel,
152               showOperatorLine)
153           .doFirst(() ->
154           {
155             try
156             {
157               generator.useDefaultPrettyPrinter();
158               generator.writeStartArray();
159             }
160             catch (IOException e)
161             {
162               throw new RuntimeException(e);
163             }
164           })
165           .doOnTerminate(() ->
166           {
167             try
168             {
169               generator.writeEndArray();
170               generator.close();
171             }
172             catch (IOException e)
173             {
174               throw new RuntimeException(e);
175             }
176           })
177           .map(message ->
178           {
179             try
180             {
181               MessageTo messageTo = MessageTo.from(message);
182               generator.writeObject(messageTo);
183               return message;
184             }
185             catch (IOException e)
186             {
187               throw new RuntimeException(e);
188             }
189           });
190     }
191     catch (IOException e)
192     {
193       throw new RuntimeException(e);
194     }
195   }
196
197   @Override
198   public Flux<Message> readChatRoomData(UUID chatRoomId)
199   {
200     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
201     return Flux
202         .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
203         .log(
204             loggingCategory,
205             loggingLevel,
206             showOperatorLine)
207         .map(MessageTo::toMessage);
208   }
209
210   Path chatroomsPath()
211   {
212     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
213   }
214
215   Path chatroomPath(UUID id)
216   {
217     return storagePath.resolve(Path.of(id.toString() + ".json"));
218   }
219 }