package de.juplo.reactorm;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/juplo/reactorm/PaginatedSourcePublisher.class */
public class PaginatedSourcePublisher<T> implements Publisher<T> {
    public static final Logger LOG = LoggerFactory.getLogger(PaginatedSourcePublisher.class);
    public static final int DEFAULT_MAX = 100;
    public static final int DEFAULT_MIN = 100;
    public static final int DEFAULT_FILL = 20;
    private final Executor executor;
    private final BiFunction<Long, Long, List<T>> function;
    private final int min;
    private final int max;
    private final int fill;
    private volatile boolean completed;
    private volatile Throwable error;
    private long current;
    private long start;
    private boolean publishing;
    private final Map<Subscriber<? super T>, Request> requests;
    private final Queue<T> unpublished;

    public PaginatedSourcePublisher(BiFunction<Long, Long, List<T>> biFunction) {
        this(biFunction, 100, 100, 20);
    }

    public PaginatedSourcePublisher(Executor executor, BiFunction<Long, Long, List<T>> biFunction) {
        this(executor, biFunction, 100, 100, 20);
    }

    public PaginatedSourcePublisher(BiFunction<Long, Long, List<T>> biFunction, int i, int i2, int i3) {
        this(Executors.newSingleThreadExecutor(), biFunction, i, i2, i3);
    }

    public PaginatedSourcePublisher(Executor executor, BiFunction<Long, Long, List<T>> biFunction, int i, int i2, int i3) {
        this.completed = false;
        this.error = null;
        this.current = 0L;
        this.start = 0L;
        this.requests = new HashMap();
        this.unpublished = new LinkedBlockingQueue();
        this.executor = new SerialExecutor(executor);
        this.function = biFunction;
        if (i < 1) {
            throw new IllegalArgumentException("Minimum chunk size must be at least one!");
        }
        this.min = i;
        if (i2 < 1) {
            throw new IllegalArgumentException("Maximum chunk size must be at least one!");
        }
        this.max = i2;
        if (i3 < 0) {
            throw new IllegalArgumentException("Minimum queue length must be at least zero!");
        }
        if (i3 > i2 / 2) {
            throw new IllegalArgumentException("Minimum queue length can be at most the half of the maximum chunk size!");
        }
        this.fill = i3;
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}: subscription from {}", this, Integer.toHexString(subscriber.hashCode()));
        }
        PaginatedSourceSubscription paginatedSourceSubscription = new PaginatedSourceSubscription(this, subscriber);
        try {
            synchronized (this.requests) {
                this.requests.put(subscriber, new Request());
            }
            subscriber.onSubscribe(paginatedSourceSubscription);
            publish();
        } catch (NullPointerException e) {
            throw e;
        } catch (Exception e2) {
            LOG.debug("{}: unexpected error {}", this, e2);
            this.error = e2;
            publish();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}: subscription from {} done!", this, Integer.toHexString(subscriber.hashCode()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unsubscribe(Subscriber subscriber) {
        synchronized (this.requests) {
            this.requests.remove(subscriber);
        }
    }

    public boolean isSubscribed(Subscriber subscriber) {
        boolean containsKey;
        synchronized (this.requests) {
            containsKey = this.requests.containsKey(subscriber);
        }
        return containsKey;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addRequest(Subscriber subscriber, long j) {
        synchronized (this.requests) {
            this.requests.get(subscriber).add(j);
        }
    }

    public boolean isCompleted() {
        return this.completed && this.unpublished.isEmpty();
    }

    public boolean hasError() {
        return this.error != null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void signalError(Throwable th) {
        this.error = th;
        publish();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void publish() {
        long longValue;
        if (this.error != null) {
            synchronized (this.requests) {
                this.requests.keySet().forEach(subscriber -> {
                    subscriber.onError(this.error);
                });
                this.requests.clear();
            }
            return;
        }
        if (this.publishing) {
            return;
        }
        this.publishing = true;
        int i = Integer.MAX_VALUE;
        while (i > 0 && !this.unpublished.isEmpty()) {
            i = 0;
            synchronized (this.requests) {
                Iterator<Map.Entry<Subscriber<? super T>, Request>> it = this.requests.entrySet().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Map.Entry<Subscriber<? super T>, Request> next = it.next();
                    if (this.unpublished.isEmpty()) {
                        LOG.debug("{}: no more itemes available for publication", this);
                        break;
                    }
                    if (next.getValue().consume()) {
                        T remove = this.unpublished.remove();
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("{}: calling {}.onNext({}), remaining requested items for this subscriber: {}, available: {}", new Object[]{this, Integer.toHexString(next.getKey().hashCode()), remove, next.getValue(), Integer.valueOf(this.unpublished.size())});
                        }
                        this.current++;
                        i++;
                        try {
                            next.getKey().onNext(remove);
                        } catch (Throwable th) {
                            LOG.error("unallowed error while signalling onNext({}) to subscriber {}", remove, Integer.toHexString(next.getKey().hashCode()));
                            signalError(th);
                        }
                    } else if (LOG.isDebugEnabled()) {
                        LOG.debug("{}: request from subscriber {} is satisfied!", this, Integer.toHexString(next.getKey().hashCode()));
                    }
                }
            }
        }
        this.publishing = false;
        if (this.unpublished.isEmpty()) {
            if (this.completed) {
                synchronized (this.requests) {
                    this.requests.keySet().forEach(subscriber2 -> {
                        subscriber2.onComplete();
                    });
                    this.requests.clear();
                }
                return;
            }
            synchronized (this.requests) {
                longValue = ((Long) this.requests.values().stream().map(request -> {
                    return Long.valueOf(request.get());
                }).reduce(0L, (l, l2) -> {
                    long longValue2 = l.longValue() + l2.longValue();
                    return Long.valueOf((longValue2 > ((long) this.max) || (longValue2 <= 0 && l2.longValue() > 0)) ? this.max : longValue2);
                })).longValue();
            }
            if (longValue == 0) {
                return;
            }
            final long j = this.start;
            final long j2 = longValue < ((long) this.min) ? this.min : longValue;
            if (this.current + longValue < this.start - this.fill) {
                LOG.debug("{}: no need to request more items (published={}, requested={}, fetched={}, minfill={})", new Object[]{this, Long.valueOf(this.current), Long.valueOf(longValue), Long.valueOf(j), Integer.valueOf(this.fill)});
                return;
            }
            LOG.debug("{}: scheduling new fetch {} -> {} (published={}, requested={}, fetched={}, minfill={})", new Object[]{this, Long.valueOf(j + 1), Long.valueOf(j + j2), Long.valueOf(this.current), Long.valueOf(longValue), Long.valueOf(j), Integer.valueOf(this.fill)});
            this.start += j2;
            this.executor.execute(new Runnable() { // from class: de.juplo.reactorm.PaginatedSourcePublisher.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        PaginatedSourcePublisher.LOG.debug("{}: fetching {} -> {}", new Object[]{PaginatedSourcePublisher.this, Long.valueOf(j + 1), Long.valueOf(j + j2)});
                        List list = (List) PaginatedSourcePublisher.this.function.apply(Long.valueOf(j), Long.valueOf(j2));
                        PaginatedSourcePublisher.LOG.debug("{}: fetched {}", PaginatedSourcePublisher.this, Integer.valueOf(list.size()));
                        if (list.size() < j2) {
                            PaginatedSourcePublisher.LOG.debug("{}: DONE (no more events available)!", PaginatedSourcePublisher.this);
                            PaginatedSourcePublisher.this.completed = true;
                        }
                        PaginatedSourcePublisher.this.unpublished.addAll(list);
                    } catch (Exception e) {
                        PaginatedSourcePublisher.this.error = e;
                    } finally {
                        PaginatedSourcePublisher.this.publish();
                    }
                }
            });
        }
    }

    public String toString() {
        return Integer.toHexString(hashCode());
    }
}
