1 package de.juplo.kafka.chat.backend.persistence;
3 import com.fasterxml.jackson.core.JsonParser;
4 import com.fasterxml.jackson.core.JsonToken;
5 import com.fasterxml.jackson.databind.JavaType;
6 import com.fasterxml.jackson.databind.ObjectMapper;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.reactivestreams.Publisher;
10 import org.reactivestreams.Subscriber;
11 import org.reactivestreams.Subscription;
13 import java.io.IOException;
14 import java.nio.file.Files;
15 import java.nio.file.NoSuchFileException;
16 import java.nio.file.Path;
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.function.Consumer;
22 @RequiredArgsConstructor
24 public class JsonFilePublisher<T> implements Publisher<T>
26 private final Path path;
27 private final ObjectMapper mapper;
28 private final JavaType type;
32 public void subscribe(Subscriber<? super T> subscriber)
34 log.info("Reading chatrooms from {}", path);
39 mapper.getFactory().createParser(Files.newBufferedReader(path));
41 if (parser.nextToken() != JsonToken.START_ARRAY)
43 throw new IllegalStateException("Expected content to be an array");
46 subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser));
48 catch (NoSuchFileException e)
50 log.info("{} does not exist - starting with empty ChatHome", path);
51 subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of()));
53 catch (IOException | IllegalStateException e)
55 subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e)))));
59 @RequiredArgsConstructor
60 private class JsonFileSubscription implements Subscription
62 private final Subscriber<? super T> subscriber;
63 private final JsonParser parser;
66 public void request(long requested)
70 while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY)
72 subscriber.onNext(mapper.readValue(parser, type));
77 subscriber.onComplete();
81 subscriber.onError(e);
86 public void cancel() {}
89 private class ReplaySubscription implements Subscription
91 private final Subscriber<? super T> subscriber;
92 private final Iterator<Consumer<Subscriber<? super T>>> iterator;
95 Subscriber<? super T> subscriber,
96 Iterable<Consumer<Subscriber<? super T>>> actions)
98 this.subscriber = subscriber;
99 this.iterator = actions.iterator();
103 public void request(long requested)
105 while (requested > 0 && iterator.hasNext())
107 iterator.next().accept(subscriber);
112 subscriber.onComplete();
116 public void cancel() {}