From 0cbdc781b0aceab2e6d7e1a8bb0708b9d691ef5e Mon Sep 17 00:00:00 2001 From: Matt Hill Date: Wed, 7 Jul 2021 15:28:52 -0600 Subject: [PATCH] re-introduce watch --- client/lib/store.ts | 46 +++++++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/client/lib/store.ts b/client/lib/store.ts index 61dd9e9..0596538 100644 --- a/client/lib/store.ts +++ b/client/lib/store.ts @@ -1,11 +1,12 @@ import { DBCache, Dump, Revision, Update } from './types' -import { applyPatch } from 'fast-json-patch' -import { BehaviorSubject, from, Observable } from 'rxjs' -import { toStream } from 'mobx-utils' +import { applyPatch, getValueByPointer } from 'fast-json-patch' +import { BehaviorSubject, Observable } from 'rxjs' +import { finalize } from 'rxjs/operators' export class Store { cache: DBCache sequence$: BehaviorSubject + private nodes: { [path: string]: BehaviorSubject } = { } constructor ( readonly initialCache: DBCache, @@ -22,30 +23,47 @@ export class Store { watch$ (p1: P1, p2: P2, p3: P3, p4: P4, p5: P5): Observable watch$ (p1: P1, p2: P2, p3: P3, p4: P4, p5: P5, p6: P6): Observable watch$ (...args: (string | number)[]): Observable { - return from(toStream(() => this.peekNode(...args), true)) + const path = `/${args.join('/')}` + if (!this.nodes[path]) { + this.nodes[path] = new BehaviorSubject(getValueByPointer(this.cache.data, path)) + this.nodes[path].pipe( + finalize(() => delete this.nodes[path]), + ) + } + return this.nodes[path].asObservable() } update (update: Update): void { if ((update as Revision).patch) { if (this.cache.sequence + 1 !== update.id) throw new Error(`Outdated sequence: current: ${this.cache.sequence}, new: ${update.id}`) - applyPatch(this.cache.data, (update as Revision).patch, true, true) + applyPatch(this.cache.data, (update as Revision).patch, true, true); + (update as Revision).patch.forEach(op => { + this.updateNodesByPath(op.path) + }) + } else { this.cache.data = (update as Dump).value + this.updateNodesByPath('') } this.cache.sequence = update.id this.sequence$.next(this.cache.sequence) } - reset (): void { - this.cache.sequence = 0 - this.cache.data = { } as T + updateNodesByPath (revisionPath: string) { + Object.keys(this.nodes).forEach(nodePath => { + if (!this.nodes[nodePath]) return + if (nodePath.includes(revisionPath) || revisionPath.includes(nodePath)) { + try { + this.nodes[nodePath].next(getValueByPointer(this.cache.data, nodePath)) + } catch (e) { + this.nodes[nodePath].complete() + delete this.nodes[nodePath] + } + } + }) } - private peekNode (...args: (string | number)[]): any { - try { - return args.reduce((acc, next) => (acc as any)[`${next}`], this.cache.data) - } catch (e) { - return undefined - } + reset (): void { + Object.values(this.nodes).forEach(node => node.complete()) } }