import { EMPTY, map, merge, Observable, shareReplay, Subject } from 'rxjs' import { catchError, filter, switchMap, tap } from 'rxjs/operators' import { Store } from './store' import { DBCache, Http } from './types' import { Source } from './source/source' export class PatchDB { public store: Store = new Store(this.http, this.initialCache) public connectionError$ = new Subject() public cache$ = this.sources$.pipe( switchMap(sources => merge(...sources.map(s => s.watch$(this.store))).pipe( catchError(e => { this.connectionError$.next(e) return EMPTY }), ), ), tap(_ => this.connectionError$.next(null)), filter(Boolean), map(res => { this.store.update(res) return this.store.cache }), shareReplay(1), ) constructor( private readonly sources$: Observable[]>, private readonly http: Http, private readonly initialCache: DBCache, ) {} }