refactor: `storage` is not a sub-package of `persistence` - Aligned code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / storage / files / JsonFilePublisher.java
1 package de.juplo.kafka.chat.backend.storage.files;
2
3 import com.fasterxml.jackson.core.JsonParser;
4 import com.fasterxml.jackson.core.JsonToken;
5 import com.fasterxml.jackson.databind.JavaType;
6 import com.fasterxml.jackson.databind.ObjectMapper;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.reactivestreams.Publisher;
10 import org.reactivestreams.Subscriber;
11 import org.reactivestreams.Subscription;
12
13 import java.io.IOException;
14 import java.nio.file.Files;
15 import java.nio.file.NoSuchFileException;
16 import java.nio.file.Path;
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.function.Consumer;
20
21
22 @RequiredArgsConstructor
23 @Slf4j
24 public class JsonFilePublisher<T> implements Publisher<T>
25 {
26   private final Path path;
27   private final ObjectMapper mapper;
28   private final JavaType type;
29
30
31   @Override
32   public void subscribe(Subscriber<? super T> subscriber)
33   {
34     log.info("Reading chatrooms from {}", path);
35
36     try
37     {
38       JsonParser parser =
39           mapper.getFactory().createParser(Files.newBufferedReader(path));
40
41       if (parser.nextToken() != JsonToken.START_ARRAY)
42       {
43         throw new IllegalStateException("Expected content to be an array");
44       }
45
46       subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser));
47     }
48     catch (NoSuchFileException e)
49     {
50       log.info("{} does not exist - starting with empty ChatHome", path);
51       subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of()));
52     }
53     catch (IOException | IllegalStateException e)
54     {
55       subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e)))));
56     }
57   }
58
59   @RequiredArgsConstructor
60   private class JsonFileSubscription implements Subscription
61   {
62     private final Subscriber<? super T> subscriber;
63     private final JsonParser parser;
64
65     @Override
66     public void request(long requested)
67     {
68       try
69       {
70         while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY)
71         {
72           subscriber.onNext(mapper.readValue(parser, type));
73           requested--;
74         }
75
76         if (requested > 0)
77           subscriber.onComplete();
78       }
79       catch (IOException e)
80       {
81         subscriber.onError(e);
82       }
83     }
84
85     @Override
86     public void cancel() {}
87   }
88
89   private class ReplaySubscription implements Subscription
90   {
91     private final Subscriber<? super T> subscriber;
92     private final Iterator<Consumer<Subscriber<? super T>>> iterator;
93
94     ReplaySubscription(
95         Subscriber<? super T> subscriber,
96         Iterable<Consumer<Subscriber<? super T>>> actions)
97     {
98       this.subscriber = subscriber;
99       this.iterator = actions.iterator();
100     }
101
102     @Override
103     public void request(long requested)
104     {
105       while (requested > 0 && iterator.hasNext())
106       {
107         iterator.next().accept(subscriber);
108         requested--;
109       }
110
111       if (requested > 0)
112         subscriber.onComplete();
113     }
114
115     @Override
116     public void cancel() {}
117   }
118 }