import { Bootstrapper, DBCache, Dump, Revision, Update } from './types' import { BehaviorSubject, filter, Observable, Subscription, switchMap, take, withLatestFrom, } from 'rxjs' import { applyOperation, arrayFromPath, getValueByPointer, pathFromArray, } from './json-patch-lib' export class PatchDB { private sub: Subscription | null = null private watchedNodes: { [path: string]: { subject: BehaviorSubject pathArr: string[] } } = {} readonly cache$ = new BehaviorSubject>({ sequence: 0, data: {} as T, }) constructor(private readonly source$: Observable[]>) {} start(bootstrapper: Bootstrapper) { if (this.sub) return const initialCache = bootstrapper.init() this.cache$.next(initialCache) this.sub = this.source$ .pipe(withLatestFrom(this.cache$)) .subscribe(([updates, cache]) => { this.proccessUpdates(updates, cache) bootstrapper.update(cache) }) } stop() { if (!this.sub) return Object.values(this.watchedNodes).forEach(node => node.subject.complete()) this.watchedNodes = {} this.sub.unsubscribe() this.sub = null this.cache$.next({ sequence: 0, data: {} as T }) } watch$(): Observable watch$(p1: P1): Observable> watch$>( p1: P1, p2: P2, ): Observable[P2]>> watch$< P1 extends keyof T, P2 extends keyof NonNullable, P3 extends keyof NonNullable[P2]>, >( p1: P1, p2: P2, p3: P3, ): Observable[P2]>[P3]>> watch$< P1 extends keyof T, P2 extends keyof NonNullable, P3 extends keyof NonNullable[P2]>, P4 extends keyof NonNullable[P2]>[P3]>, >( p1: P1, p2: P2, p3: P3, p4: P4, ): Observable< NonNullable[P2]>[P3]>[P4]> > watch$< P1 extends keyof T, P2 extends keyof NonNullable, P3 extends keyof NonNullable[P2]>, P4 extends keyof NonNullable[P2]>[P3]>, P5 extends keyof NonNullable< NonNullable[P2]>[P3]>[P4] >, >( p1: P1, p2: P2, p3: P3, p4: P4, p5: P5, ): Observable< NonNullable< NonNullable[P2]>[P3]>[P4]>[P5] > > watch$< P1 extends keyof T, P2 extends keyof NonNullable, P3 extends keyof NonNullable[P2]>, P4 extends keyof NonNullable[P2]>[P3]>, P5 extends keyof NonNullable< NonNullable[P2]>[P3]>[P4] >, P6 extends keyof NonNullable< NonNullable[P2]>[P3]>[P4]>[P5] >, >( p1: P1, p2: P2, p3: P3, p4: P4, p5: P5, p6: P6, ): Observable< NonNullable< NonNullable< NonNullable< NonNullable[P2]>[P3]>[P4] >[P5] >[P6] > > watch$(...args: (string | number)[]): Observable { return this.cache$.pipe( filter(({ sequence }) => !!sequence), take(1), switchMap(({ data }) => { const path = pathFromArray(args) if (!this.watchedNodes[path]) { const value = getValueByPointer(data, path) this.watchedNodes[path] = { subject: new BehaviorSubject(value), pathArr: arrayFromPath(path), } } return this.watchedNodes[path].subject }), ) } proccessUpdates(updates: Update[], cache: DBCache) { updates.forEach(update => { if (this.isRevision(update)) { const expected = cache.sequence + 1 if (update.id < expected) return if (update.id > expected) { return console.error( // unreachable `Received futuristic revision. Expected ${expected}, got ${update.id}`, ) } this.handleRevision(update, cache) } else { this.handleDump(update, cache) } cache.sequence = update.id }) this.cache$.next(cache) } private handleRevision(revision: Revision, cache: DBCache): void { // apply opperations revision.patch.forEach(op => { applyOperation(cache, op) }) // update watched nodes revision.patch.forEach(op => { this.updateWatchedNodes(op.path, cache.data) }) } private handleDump(dump: Dump, cache: DBCache): void { cache.data = { ...dump.value } this.updateWatchedNodes('', cache.data) } private updateWatchedNodes(revisionPath: string, data: T): void { const r = arrayFromPath(revisionPath) Object.entries(this.watchedNodes).forEach(([path, { pathArr }]) => { if (startsWith(pathArr, r) || startsWith(r, pathArr)) { this.updateWatchedNode(path, data) } }) } private updateWatchedNode(path: string, data: T): void { const value = getValueByPointer(data, path) this.watchedNodes[path].subject.next(value) } private isRevision(update: Update): update is Revision { return 'patch' in update } } function startsWith(a: string[], b: string[]) { for (let i = 0; i < b.length; i++) { if (a[i] !== b[i]) return false } return true }