diff --git a/client/index.ts b/client/index.ts index 582ffe4..ec8c227 100644 --- a/client/index.ts +++ b/client/index.ts @@ -1,5 +1,4 @@ export * from './lib/source/mock-source' -export * from './lib/source/poll-source' export * from './lib/source/ws-source' export * from './lib/source/source' export * from './lib/json-patch-lib' diff --git a/client/lib/patch-db.ts b/client/lib/patch-db.ts index 5d8b894..bed63f9 100644 --- a/client/lib/patch-db.ts +++ b/client/lib/patch-db.ts @@ -1,5 +1,5 @@ -import { EMPTY, map, merge, Observable, shareReplay, Subject } from 'rxjs' -import { catchError, filter, switchMap, tap } from 'rxjs/operators' +import { map, shareReplay, Subject, timer } from 'rxjs' +import { catchError, concatMap, tap } from 'rxjs/operators' import { Store } from './store' import { DBCache, Http } from './types' import { Source } from './source/source' @@ -7,27 +7,21 @@ import { Source } from './source/source' export class PatchDB { public store: Store = new Store(this.http, this.initialCache) public connectionError$ = new Subject() - public cache$ = this.sources$.pipe( - switchMap(sources => - merge(...sources.map(s => s.watch$(this.store))).pipe( - catchError(e => { - this.connectionError$.next(e) - - return EMPTY - }), - ), - ), - tap(_ => this.connectionError$.next(null)), - filter(Boolean), - map(res => { - this.store.update(res) - return this.store.cache + public cache$ = this.source.watch$(this.store).pipe( + catchError((e, watch$) => { + this.connectionError$.next(e) + return timer(4000).pipe(concatMap(() => watch$)) }), + tap(res => { + this.connectionError$.next(null) + this.store.update(res) + }), + map(_ => this.store.cache), shareReplay(1), ) constructor( - private readonly sources$: Observable[]>, + private readonly source: Source, private readonly http: Http, private readonly initialCache: DBCache, ) {} diff --git a/client/lib/source/poll-source.ts b/client/lib/source/poll-source.ts deleted file mode 100644 index 25c82c6..0000000 --- a/client/lib/source/poll-source.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { from, Observable, of, repeat } from 'rxjs' -import { concatMap, take } from 'rxjs/operators' -import { Store } from '../store' -import { Http, Update } from '../types' -import { Source } from './source' - -export type PollConfig = { - cooldown: number -} - -export class PollSource implements Source { - constructor( - private readonly pollConfig: PollConfig, - private readonly http: Http, - ) {} - - watch$({ sequence$ }: Store): Observable | null> { - return sequence$.pipe( - concatMap(seq => this.http.getRevisions(seq)), - take(1), - concatMap(res => { - // If Revision[] - if (Array.isArray(res)) { - // Convert Revision[] it into Observable OR return null - return res.length ? from(res) : of(null) - // If Dump - } - // Convert Dump into Observable> - return of(res) - }), - repeat({ delay: this.pollConfig.cooldown }), - ) - } -} diff --git a/client/lib/source/source.ts b/client/lib/source/source.ts index ce5197f..4e69eb0 100644 --- a/client/lib/source/source.ts +++ b/client/lib/source/source.ts @@ -3,5 +3,5 @@ import { Store } from '../store' import { Update } from '../types' export interface Source { - watch$(store?: Store): Observable | null> + watch$(store?: Store): Observable> } diff --git a/client/lib/source/ws-source.ts b/client/lib/source/ws-source.ts index 1e8d531..f73c76c 100644 --- a/client/lib/source/ws-source.ts +++ b/client/lib/source/ws-source.ts @@ -7,6 +7,6 @@ export class WebsocketSource implements Source { constructor(private readonly url: string) {} watch$(): Observable> { - return webSocket>(this.url).pipe(timeout({ first: 60000 })) + return webSocket>(this.url).pipe(timeout({ first: 21000 })) } }