From 0a72ceb1da7dfbcab9bc5e485822efe59843d7a3 Mon Sep 17 00:00:00 2001 From: waterplea Date: Tue, 21 Jun 2022 13:28:27 +0300 Subject: [PATCH] refactor: remove nested subscription --- client/lib/patch-db.ts | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/client/lib/patch-db.ts b/client/lib/patch-db.ts index 501db48..6610b56 100644 --- a/client/lib/patch-db.ts +++ b/client/lib/patch-db.ts @@ -1,4 +1,5 @@ -import { merge, Observable, ReplaySubject, Subject, Subscription } from 'rxjs' +import { merge, Observable, ReplaySubject, Subject } from 'rxjs' +import { switchMap } from 'rxjs/operators' import { Store } from './store' import { DBCache, Http } from './types' import { RPCError } from './source/ws-source' @@ -10,22 +11,20 @@ export class PatchDB { public rpcError$ = new Subject() public cache$ = new ReplaySubject>(1) - private updatesSub?: Subscription - private sourcesSub = this.sources$.subscribe(sources => { - this.updatesSub = merge(...sources.map(s => s.watch$(this.store))).subscribe({ - next: (res) => { - if ('result' in res) { - this.store.update(res.result) - this.cache$.next(this.store.cache) - } - else { - this.rpcError$.next(res) - } - }, - error: (e) => { - this.connectionError$.next(e) - }, - }) + private sub = this.sources$.pipe( + switchMap(sources => merge(...sources.map(s => s.watch$(this.store)))), + ).subscribe({ + next: (res) => { + if ('result' in res) { + this.store.update(res.result) + this.cache$.next(this.store.cache) + } else { + this.rpcError$.next(res) + } + }, + error: (e) => { + this.connectionError$.next(e) + }, }) constructor ( @@ -35,7 +34,6 @@ export class PatchDB { ) { } clean () { - this.sourcesSub.unsubscribe() - this.updatesSub?.unsubscribe() + this.sub.unsubscribe() } }