import { Bootstrapper, DBCache, Dump, Revision, Update } from './types' import { BehaviorSubject, filter, Observable, Subscription, switchMap, take, withLatestFrom, } from 'rxjs' import { applyOperation, arrayFromPath, getValueByPointer, pathFromArray, } from './json-patch-lib' export class PatchDB { private sub: Subscription | null = null private watchedNodes: { [path: string]: { subject: BehaviorSubject pathArr: string[] } } = {} readonly cache$ = new BehaviorSubject>({ sequence: 0, data: {} as T, }) constructor(private readonly source$: Observable[]>) {} start(bootstrapper: Bootstrapper) { if (this.sub) return const initialCache = bootstrapper.init() this.cache$.next(initialCache) this.sub = this.source$ .pipe(withLatestFrom(this.cache$)) .subscribe(([updates, cache]) => { this.proccessUpdates(updates, cache) bootstrapper.update(cache) }) } stop() { if (!this.sub) return Object.values(this.watchedNodes).forEach(node => node.subject.complete()) this.watchedNodes = {} this.sub.unsubscribe() this.sub = null this.cache$.next({ sequence: 0, data: {} as T }) } watch$(): Observable watch$(p1: P1): Observable> watch$>( p1: P1, p2: P2, ): Observable[P2]>> watch$< P1 extends keyof T, P2 extends keyof NonNullable, P3 extends keyof NonNullable[P2]>, >( p1: P1, p2: P2, p3: P3, ): Observable[P2]>[P3]>> watch$< P1 extends keyof T, P2 extends keyof NonNullable, P3 extends keyof NonNullable[P2]>, P4 extends keyof NonNullable[P2]>[P3]>, >( p1: P1, p2: P2, p3: P3, p4: P4, ): Observable< NonNullable[P2]>[P3]>[P4]> > watch$< P1 extends keyof T, P2 extends keyof NonNullable, P3 extends keyof NonNullable[P2]>, P4 extends keyof NonNullable[P2]>[P3]>, P5 extends keyof NonNullable< NonNullable[P2]>[P3]>[P4] >, >( p1: P1, p2: P2, p3: P3, p4: P4, p5: P5, ): Observable< NonNullable< NonNullable[P2]>[P3]>[P4]>[P5] > > watch$< P1 extends keyof T, P2 extends keyof NonNullable, P3 extends keyof NonNullable[P2]>, P4 extends keyof NonNullable[P2]>[P3]>, P5 extends keyof NonNullable< NonNullable[P2]>[P3]>[P4] >, P6 extends keyof NonNullable< NonNullable[P2]>[P3]>[P4]>[P5] >, >( p1: P1, p2: P2, p3: P3, p4: P4, p5: P5, p6: P6, ): Observable< NonNullable< NonNullable< NonNullable< NonNullable[P2]>[P3]>[P4] >[P5] >[P6] > > watch$(...args: (string | number)[]): Observable { return this.cache$.pipe( filter(({ sequence }) => !!sequence), take(1), switchMap(({ data }) => { const path = pathFromArray(args) if (!this.watchedNodes[path]) { const value = getValueByPointer(data, path) this.watchedNodes[path] = { subject: new BehaviorSubject(value), pathArr: arrayFromPath(path), } } return this.watchedNodes[path].subject }), ) } proccessUpdates(updates: Update[], cache: DBCache) { updates.forEach(update => { if (this.isRevision(update)) { const expected = cache.sequence + 1 if (update.id < expected) return this.handleRevision(update, cache) } else { this.handleDump(update, cache) } cache.sequence = update.id }) this.cache$.next(cache) } private handleRevision(revision: Revision, cache: DBCache): void { // apply opperations revision.patch.forEach(op => { applyOperation(cache, op) }) // update watched nodes Object.entries(this.watchedNodes).forEach(([watchedPath, { pathArr }]) => { const match = revision.patch.find(({ path }) => { const arr = arrayFromPath(path) return startsWith(pathArr, arr) || startsWith(arr, pathArr) }) if (match) this.updateWatchedNode(watchedPath, cache.data) }) } private handleDump(dump: Dump, cache: DBCache): void { cache.data = { ...dump.value } Object.keys(this.watchedNodes).forEach(watchedPath => { this.updateWatchedNode(watchedPath, cache.data) }) } private updateWatchedNode(path: string, data: T): void { const value = getValueByPointer(data, path) this.watchedNodes[path].subject.next(value) } private isRevision(update: Update): update is Revision { return 'patch' in update } } function startsWith(a: string[], b: string[]) { for (let i = 0; i < b.length; i++) { if (a[i] !== b[i]) return false } return true }