import { Inject, Injectable, InjectionToken } from '@angular/core' import { Bootstrapper, PatchDB, Source, Store } from 'patch-db-client' import { BehaviorSubject, Observable, of, Subscription } from 'rxjs' import { catchError, debounceTime, finalize, map, tap } from 'rxjs/operators' import { ApiService } from '../api/api.service' import { DataModel } from './data-model' export const PATCH_HTTP = new InjectionToken>('app.config') export const PATCH_SOURCE = new InjectionToken>('app.config') export const BOOTSTRAPPER = new InjectionToken>('app.config') export enum ConnectionStatus { Initializing = 'initializing', Connected = 'connected', Disconnected = 'disconnected', } @Injectable({ providedIn: 'root', }) export class PatchDbModel { connectionStatus$ = new BehaviorSubject(ConnectionStatus.Initializing) sequence$: Observable data: DataModel private patchDb: PatchDB private patchSub: Subscription constructor ( @Inject(PATCH_SOURCE) private readonly source: Source, @Inject(PATCH_HTTP) private readonly http: ApiService, @Inject(BOOTSTRAPPER) private readonly bootstrapper: Bootstrapper, ) { } async init (): Promise { const cache = await this.bootstrapper.init() this.patchDb = new PatchDB([this.source, this.http], this.http, cache) this.sequence$ = this.patchDb.store.sequence$.asObservable() this.data = this.patchDb.store.cache.data } start (): void { // make sure everything is stopped before initializing this.stop() try { this.patchSub = this.patchDb.sync$() .pipe(debounceTime(500)) .subscribe({ next: cache => { console.log('saving cacheee: ', cache) this.connectionStatus$.next(ConnectionStatus.Connected) this.bootstrapper.update(cache) }, error: e => { console.error('patch-db-sync sub ERROR', e) this.connectionStatus$.next(ConnectionStatus.Disconnected) // this.start() }, complete: () => { console.error('patch-db-sync sub COMPLETE') }, }) } catch (e) { console.log('Failed to initialize PatchDB', e) } } stop (): void { if (this.patchSub) { this.patchSub.unsubscribe() this.patchSub = undefined } } connected$ (): Observable { return this.connectionStatus$ .pipe( map(status => status === ConnectionStatus.Connected), ) } watchConnection$ (): Observable { return this.connectionStatus$.asObservable() } watch$: Store['watch$'] = (...args: (string | number)[]): Observable => { console.log('WATCHING', ...args) return this.patchDb.store.watch$(...(args as [])).pipe( tap(cache => console.log('CHANGE IN STORE', cache)), catchError(e => { console.error(e) return of(e.message) }), finalize(() => console.log('UNSUBSCRIBING')), ) } }