import { EMPTY, merge, Observable, ReplaySubject, Subject } from 'rxjs' import { catchError, switchMap } from 'rxjs/operators' import { Store } from './store' import { DBCache, Http } from './types' import { Source } from './source/source' export class PatchDB { public store: Store = new Store(this.http, this.initialCache) public connectionError$ = new Subject() public cache$ = new ReplaySubject>(1) constructor( private readonly sources$: Observable[]>, private readonly http: Http, private readonly initialCache: DBCache, ) {} ngOnInit() { this.sources$ .pipe( switchMap(sources => merge(...sources.map(s => s.watch$(this.store))).pipe( catchError(e => { this.connectionError$.next(e) return EMPTY }), ), ), ) .subscribe(res => { this.store.update(res) this.cache$.next(this.store.cache) }) } }