From 915433f877efb7b9c5d714d68d3cc7897d747a28 Mon Sep 17 00:00:00 2001 From: waterplea Date: Wed, 29 Jun 2022 11:51:28 +0300 Subject: [PATCH] fix: properly handle source errors without stopping the stream --- client/lib/patch-db.ts | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/client/lib/patch-db.ts b/client/lib/patch-db.ts index 6610b56..d4e8468 100644 --- a/client/lib/patch-db.ts +++ b/client/lib/patch-db.ts @@ -1,5 +1,5 @@ -import { merge, Observable, ReplaySubject, Subject } from 'rxjs' -import { switchMap } from 'rxjs/operators' +import { EMPTY, merge, Observable, ReplaySubject, Subject } from 'rxjs' +import { catchError, switchMap } from 'rxjs/operators' import { Store } from './store' import { DBCache, Http } from './types' import { RPCError } from './source/ws-source' @@ -12,19 +12,20 @@ export class PatchDB { public cache$ = new ReplaySubject>(1) private sub = this.sources$.pipe( - switchMap(sources => merge(...sources.map(s => s.watch$(this.store)))), - ).subscribe({ - next: (res) => { - if ('result' in res) { - this.store.update(res.result) - this.cache$.next(this.store.cache) - } else { - this.rpcError$.next(res) - } - }, - error: (e) => { - this.connectionError$.next(e) - }, + switchMap(sources => merge(...sources.map(s => s.watch$(this.store))).pipe( + catchError(e => { + this.connectionError$.next(e) + + return EMPTY + }), + )), + ).subscribe((res) => { + if ('result' in res) { + this.store.update(res.result) + this.cache$.next(this.store.cache) + } else { + this.rpcError$.next(res) + } }) constructor (