RxJava: Aufruf onError ohne Abschluss / Abmeldung
Habe ich den folgenden code (*), die polling Verwendung eines scheduler, der ruft rekursiv die mitgelieferte beobachten.
(*) inspiriert von https://github.com/ReactiveX/RxJava/issues/448
Ist dies korrekt funktioniert, wenn ich nur an die onNext
event zu den Abonnenten. Aber wenn ich den pass der onError
Veranstaltung für die Teilnehmer, die unsubscribe-Ereignis aufgerufen wird und diese wiederum tötet den scheduler.
Möchte ich auch die Fehler, die zu den Abonnenten. Irgendwelche Ideen, wie das zu erreichen?
public Observable<Status> observe() {
return Observable.create(new PollingSubscriberAction<>(service.getStatusObservable(), 5, TimeUnit.SECONDS));
}
private class PollingSubscriberAction<T> implements Observable.OnSubscribe<T> {
private Subscription subscription;
private Subscription innerSubscription;
private Scheduler.Worker worker = Schedulers.newThread().createWorker();
private Observable<T> observable;
private long delayTime;
private TimeUnit unit;
public PollingSubscriberAction(final Observable<T> observable, long delayTime, TimeUnit unit) {
this.observable = observable;
this.delayTime = delayTime;
this.unit = unit;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
subscription = worker.schedule(new Action0() {
@Override
public void call() {
schedule(subscriber, true);
}
});
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
subscription.unsubscribe();
if (innerSubscription != null) {
innerSubscription.unsubscribe();
}
}
}));
}
private void schedule(final Subscriber<? super T> subscriber, boolean immediately) {
long delayTime = immediately ? 0 : this.delayTime;
subscription = worker.schedule(createInnerAction(subscriber), delayTime, unit);
}
private Action0 createInnerAction(final Subscriber<? super T> subscriber) {
return new Action0() {
@Override
public void call() {
innerSubscription = observable.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
schedule(subscriber, false);
}
@Override
public void onError(Throwable e) {
//Doesn't work.
//subscriber.onError(e);
schedule(subscriber, false);
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
});
}
};
}
}
Du musst angemeldet sein, um einen Kommentar abzugeben.
So Spiele ich schon mit diesem für einige Zeit, und ich glaube nicht, dass es möglich ist, in der Weise Sie es tun. Aufruf
onError
oderonCompleted
beenden Sie den stream, spiegeln diedone
Flagge innerhalb derSafeSubscriber
wrapper, und es ist nicht nur ein Weg, um es zurückzusetzen.Ich sehe 2 Optionen zur Verfügung - weder finde ich besonders elegant, aber funktioniert.
1 -
UnsafeSubscribe
. Vielleicht nicht die beste Idee, aber es funktioniert, weil statt der Verpackung IhrerSubscriber
imSafeSubscriber
, ruft es direkt. Am besten Lesen Sie die Javadoc, um zu sehen, wenn das OK ist für Sie. Oder, wenn Sie abenteuerlustig fühlen, schreiben Sie Ihre eigenenSafeSubscriber
wo setzen Sie die done-flag oder ähnliches. Mit deinem Beispiel so nennen:2 - Umsetzung etwas ähnliches wie dieses Beispiel. Ich Schätze es ist in C#, aber es sollte lesbar sein. Einfach ausgedrückt - Sie wollen ein
Pair<T, Exception>
Klasse, und dann zu fordern, anstattonError
rufenonNext
und legen Sie die Ausnahme Seite Ihres pair. Ihre Abonnenten haben, um ein wenig klüger zu werden, um zu überprüfen, für jede Seite des Paares, und Sie müssen einige Daten-transformation zwischen Ihrer QuelleObservable
und dieObservable<Pair<T, Exception>>
, aber ich kann nicht sehen, warum es nicht funktionieren wird.Ich wäre wirklich daran interessiert, sehen einen anderen Weg, dies zu tun, wenn jemand keine.
Hoffe, das hilft,
Wird
Beide onError-und onCompleted sind beendende Ereignisse, was bedeutet, dass Ihr Beobachten nicht emittieren keine neuen Ereignisse nach dem Auftritt. Um zu schlucken/Griff-Fehler siehe Fehler-Betreiber - https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators. Auch, um zu implementieren polling-Sie könnten profitieren Sie von diesem one - http://reactivex.io/documentation/operators/interval.html
Als @Wird darauf hingewiesen, dass man nicht direkt nennen
onError
ohne die Beendigung des beobachtbaren. Da kann man nur rufenonNext
haben, beschloss ich, ein Benachrichtigung zu wickeln Sie den Wert und die throwable-in einem einzigen Objekt.Diesen zu nutzen, können Sie entweder die Benachrichtigung direkt:
Oder verwenden Sie den akzeptieren - Methode für die Benachrichtigung auf einen regulären Beobachten:
UPDATE 2015-02-24
Scheint es, dass die polling-beobachtbare hatte nicht richtig funktioniert, manchmal, weil die innere beobachtbaren nennen würde
onComplete
oderonError
auch nach es gewesen war abgemeldet, also der Umschuldung selbst. Ich fügte hinzu, dieisUnsubscribed
Flagge zu verhindern.