From 88eeaccd0a8fe1bf3b18a74ade0a9efd8120decc Mon Sep 17 00:00:00 2001 From: waterplea Date: Tue, 16 Aug 2022 17:03:04 +0300 Subject: [PATCH] chore: fix comments --- client/lib/patch-db.ts | 39 +++++++++++++++----------------- client/lib/source/poll-source.ts | 6 ++--- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/client/lib/patch-db.ts b/client/lib/patch-db.ts index cc37938..e5be7a1 100644 --- a/client/lib/patch-db.ts +++ b/client/lib/patch-db.ts @@ -1,4 +1,4 @@ -import { EMPTY, merge, Observable, ReplaySubject, Subject } from 'rxjs' +import { EMPTY, map, merge, Observable, shareReplay, Subject } from 'rxjs' import { catchError, switchMap } from 'rxjs/operators' import { Store } from './store' import { DBCache, Http } from './types' @@ -7,30 +7,27 @@ import { Source } from './source/source' export class PatchDB { public store: Store = new Store(this.http, this.initialCache) public connectionError$ = new Subject() - public cache$ = new ReplaySubject>(1) + public cache$ = this.sources$.pipe( + switchMap(sources => + merge(...sources.map(s => s.watch$(this.store))).pipe( + catchError(e => { + this.connectionError$.next(e) + + return EMPTY + }), + ), + ), + map(res => { + this.store.update(res) + + return this.store.cache + }), + shareReplay(1), + ) constructor( private readonly sources$: Observable[]>, private readonly http: Http, private readonly initialCache: DBCache, ) {} - - ngOnInit() { - this.sources$ - .pipe( - switchMap(sources => - merge(...sources.map(s => s.watch$(this.store))).pipe( - catchError(e => { - this.connectionError$.next(e) - - return EMPTY - }), - ), - ), - ) - .subscribe(res => { - this.store.update(res) - this.cache$.next(this.store.cache) - }) - } } diff --git a/client/lib/source/poll-source.ts b/client/lib/source/poll-source.ts index 9bab917..678bc4f 100644 --- a/client/lib/source/poll-source.ts +++ b/client/lib/source/poll-source.ts @@ -1,5 +1,5 @@ -import { from, Observable, of, repeatWhen, timer } from 'rxjs' -import { concatMap, map, take } from 'rxjs/operators' +import { from, Observable, of, repeat } from 'rxjs' +import { concatMap, take } from 'rxjs/operators' import { Store } from '../store' import { Http, Update } from '../types' import { Source } from './source' @@ -20,7 +20,7 @@ export class PollSource implements Source { take(1), // convert Revision[] it into Observable. Convert Dump into Observable> concatMap(res => (Array.isArray(res) ? from(res) : of(res))), - repeatWhen(() => timer(this.pollConfig.cooldown)), + repeat({ delay: this.pollConfig.cooldown }), ) } }