import { DBCache, Dump, Http, Revision, Update } from './types' import { BehaviorSubject, Observable } from 'rxjs' import { finalize } from 'rxjs/operators' import { applyOperation, getValueByPointer, Operation } from './json-patch-lib' import BTree from 'sorted-btree' export interface StashEntry { revision: Revision undo: Operation[] } export class Store { cache: DBCache sequence$: BehaviorSubject private watchedNodes: { [path: string]: BehaviorSubject } = { } private stash = new BTree() constructor ( private readonly http: Http, private readonly initialCache: DBCache, ) { this.cache = this.initialCache this.sequence$ = new BehaviorSubject(initialCache.sequence) } watch$ (): Observable watch$ (p1: P1): Observable watch$ (p1: P1, p2: P2): Observable watch$ (p1: P1, p2: P2, p3: P3): Observable watch$ (p1: P1, p2: P2, p3: P3, p4: P4): Observable watch$ (p1: P1, p2: P2, p3: P3, p4: P4, p5: P5): Observable watch$ (p1: P1, p2: P2, p3: P3, p4: P4, p5: P5, p6: P6): Observable watch$ (...args: (string | number)[]): Observable { const path = `/${args.join('/')}` if (!this.watchedNodes[path]) { this.watchedNodes[path] = new BehaviorSubject(getValueByPointer(this.cache.data, path)) this.watchedNodes[path].pipe( finalize(() => delete this.watchedNodes[path]), ) } return this.watchedNodes[path].asObservable() } update (update: Update): void { // if old or known, return if (update.id <= this.cache.sequence || this.stash.get(update.id)) return if (this.isRevision(update)) { this.handleRevision(update) } else { this.handleDump(update) } } reset (): void { Object.values(this.watchedNodes).forEach(node => node.complete()) this.stash.clear() } private handleRevision (revision: Revision): void { // stash the revision this.stash.set(revision.id, { revision, undo: [] }) // if revision is futuristic, fetch missing revisions if (revision.id > this.cache.sequence + 1) { this.http.getRevisions(this.cache.sequence) } this.processStashed(revision.id) } private handleDump (dump: Dump): void { this.cache.data = dump.value this.stash.deleteRange(this.cache.sequence, dump.id, false) this.updateWatchedNodes('') this.updateSequence(dump.id) this.processStashed(dump.id + 1) } private processStashed (id: number): void { this.undoRevisions(id) this.applyRevisions(id) } private undoRevisions (id: number): void { let stashEntry = this.stash.get(this.stash.maxKey() as number) while (stashEntry && stashEntry.revision.id > id) { stashEntry.undo.forEach(u => { applyOperation(document, u) }) stashEntry = this.stash.nextLowerPair(stashEntry.revision.id)?.[1] } } private applyRevisions (id: number): void { let revision = this.stash.get(id)?.revision while (revision) { let undo: Operation[] = [] let success = false try { revision.patch.forEach(op => { const u = applyOperation(this.cache.data, op) if (u) undo.push(u) }) success = true } catch (e) { undo.forEach(u => { applyOperation(document, u) }) undo = [] } if (success) { revision.patch.forEach(op => { this.updateWatchedNodes(op.path) }) } if (revision.id === this.cache.sequence + 1) { this.updateSequence(revision.id) } else { this.stash.set(revision.id, { revision, undo }) } // increment revision for next loop revision = this.stash.nextHigherPair(revision.id)?.[1].revision } // delete all old stashed revisions this.stash.deleteRange(0, this.cache.sequence, false) } private updateWatchedNodes (revisionPath: string) { Object.keys(this.watchedNodes).forEach(path => { if (path.includes(revisionPath) || revisionPath.includes(path)) { const val = getValueByPointer(this.cache.data, path) if (val !== undefined) { this.watchedNodes[path].next(val) } else { this.watchedNodes[path].complete() } } }) } private updateSequence (sequence: number): void { this.cache.sequence = sequence this.sequence$.next(sequence) } private isRevision (update: Update): update is Revision { return !!(update as Revision).patch } }