RxJava: Aufruf onError ohne Abschluss / Abmeldung

Habe ich den folgenden code (*), die polling Verwendung eines scheduler, der ruft rekursiv die mitgelieferte beobachten.

(*) inspiriert von https://github.com/ReactiveX/RxJava/issues/448

Ist dies korrekt funktioniert, wenn ich nur an die onNext event zu den Abonnenten. Aber wenn ich den pass der onError Veranstaltung für die Teilnehmer, die unsubscribe-Ereignis aufgerufen wird und diese wiederum tötet den scheduler.

Möchte ich auch die Fehler, die zu den Abonnenten. Irgendwelche Ideen, wie das zu erreichen?

public Observable<Status> observe() {
    return Observable.create(new PollingSubscriberAction<>(service.getStatusObservable(), 5, TimeUnit.SECONDS));
}

private class PollingSubscriberAction<T> implements Observable.OnSubscribe<T> {
    private Subscription subscription;
    private Subscription innerSubscription;
    private Scheduler.Worker worker = Schedulers.newThread().createWorker();

    private Observable<T> observable;
    private long delayTime;
    private TimeUnit unit;

    public PollingSubscriberAction(final Observable<T> observable, long delayTime, TimeUnit unit) {
        this.observable = observable;
        this.delayTime = delayTime;
        this.unit = unit;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        subscription = worker.schedule(new Action0() {
            @Override
            public void call() {
                schedule(subscriber, true);
            }
        });

        subscriber.add(Subscriptions.create(new Action0() {
            @Override
            public void call() {
                subscription.unsubscribe();
                if (innerSubscription != null) {
                    innerSubscription.unsubscribe();
                }
            }
        }));
    }

    private void schedule(final Subscriber<? super T> subscriber, boolean immediately) {
        long delayTime = immediately ? 0 : this.delayTime;
        subscription = worker.schedule(createInnerAction(subscriber), delayTime, unit);
    }

    private Action0 createInnerAction(final Subscriber<? super T> subscriber) {
        return new Action0() {
            @Override
            public void call() {
                innerSubscription = observable.subscribe(new Observer<T>() {
                    @Override
                    public void onCompleted() {
                        schedule(subscriber, false);
                    }

                    @Override
                    public void onError(Throwable e) {
                        //Doesn't work.
                        //subscriber.onError(e);
                        schedule(subscriber, false);
                    }

                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                });
            }
        };
    }
}
Schreibe einen Kommentar