From 20beb61baace9d25a8c50e4b552d59a86eb84dad Mon Sep 17 00:00:00 2001 From: Matt Hill Date: Wed, 14 Sep 2022 11:21:00 -0600 Subject: [PATCH] much more efficient (#46) * much more efficient * update children too * better compare --- client/lib/json-patch-lib.ts | 8 ++-- client/lib/patch-db.ts | 73 +++++++++++++++++++++--------------- 2 files changed, 46 insertions(+), 35 deletions(-) diff --git a/client/lib/json-patch-lib.ts b/client/lib/json-patch-lib.ts index 10b0d94..8748b89 100644 --- a/client/lib/json-patch-lib.ts +++ b/client/lib/json-patch-lib.ts @@ -43,6 +43,10 @@ export function applyOperation( doc.data = recursiveApply(doc.data, jsonPathToKeyArray(path), op, value) } +export function jsonPathToKeyArray(path: string): string[] { + return path.split('/').slice(1) +} + function recursiveApply | any[]>( data: T, path: readonly string[], @@ -105,7 +109,3 @@ function recursiveApplyArray( function isObject(val: any): val is Record { return typeof val === 'object' && val !== null && !Array.isArray(val) } - -function jsonPathToKeyArray(path: string): string[] { - return path.split('/').slice(1) -} diff --git a/client/lib/patch-db.ts b/client/lib/patch-db.ts index bd05301..dd59c7f 100644 --- a/client/lib/patch-db.ts +++ b/client/lib/patch-db.ts @@ -1,10 +1,19 @@ import { Bootstrapper, DBCache, Dump, Revision, Update } from './types' import { BehaviorSubject, Observable, Subscription, withLatestFrom } from 'rxjs' -import { applyOperation, getValueByPointer } from './json-patch-lib' +import { + applyOperation, + getValueByPointer, + jsonPathToKeyArray, +} from './json-patch-lib' export class PatchDB { private sub: Subscription | null = null - private watchedNodes: { [path: string]: BehaviorSubject } = {} + private watchedNodes: { + [path: string]: { + subject: BehaviorSubject + pathArr: string[] + } + } = {} readonly cache$ = new BehaviorSubject({ sequence: 0, data: {} as T }) @@ -27,7 +36,7 @@ export class PatchDB { stop() { if (!this.sub) return - Object.values(this.watchedNodes).forEach(node => node.complete()) + Object.values(this.watchedNodes).forEach(node => node.subject.complete()) this.watchedNodes = {} this.sub.unsubscribe() this.sub = null @@ -109,40 +118,30 @@ export class PatchDB { > > watch$(...args: (string | number)[]): Observable { - const path = `/${args.join('/')}` + const path = args.length ? `/${args.join('/')}` : '' - return new Observable(subscriber => { + if (!this.watchedNodes[path]) { const data = this.cache$.value.data const value = getValueByPointer(data, path) - const source = this.watchedNodes[path] || new BehaviorSubject(value) - const subscription = source.subscribe(subscriber) - - this.watchedNodes[path] = source - this.updateWatchedNode(path, data) - - return () => { - subscription.unsubscribe() - - if (!source.observed) { - source.complete() - delete this.watchedNodes[path] - } + this.watchedNodes[path] = { + subject: new BehaviorSubject(value), + pathArr: jsonPathToKeyArray(path), } - }) + } + + return this.watchedNodes[path].subject } proccessUpdates(updates: Update[], cache: DBCache) { updates.forEach(update => { - if (update.id <= cache.sequence) return - if (this.isRevision(update)) { - if (update.id > cache.sequence + 1) { - console.error( - `Received futuristic revision. Expected ${ - cache.sequence + 1 - }, got ${update.id}`, + const expected = cache.sequence + 1 + if (update.id < expected) return + if (update.id > expected) { + return console.error( + // unreachable + `Received futuristic revision. Expected ${expected}, got ${update.id}`, ) - return } this.handleRevision(update, cache) } else { @@ -154,8 +153,12 @@ export class PatchDB { } private handleRevision(revision: Revision, cache: DBCache): void { + // apply opperations revision.patch.forEach(op => { applyOperation(cache, op) + }) + // update watched nodes + revision.patch.forEach(op => { this.updateWatchedNodes(op.path, cache.data) }) } @@ -165,9 +168,10 @@ export class PatchDB { this.updateWatchedNodes('', cache.data) } - private updateWatchedNodes(revisionPath: string, data: T) { - Object.keys(this.watchedNodes).forEach(path => { - if (path.includes(revisionPath) || revisionPath.includes(path)) { + private updateWatchedNodes(revisionPath: string, data: T): void { + const r = jsonPathToKeyArray(revisionPath) + Object.entries(this.watchedNodes).forEach(([path, { pathArr }]) => { + if (startsWith(pathArr, r) || startsWith(r, pathArr)) { this.updateWatchedNode(path, data) } }) @@ -175,10 +179,17 @@ export class PatchDB { private updateWatchedNode(path: string, data: T): void { const value = getValueByPointer(data, path) - this.watchedNodes[path].next(value) + this.watchedNodes[path].subject.next(value) } private isRevision(update: Update): update is Revision { return 'patch' in update } } + +function startsWith(a: string[], b: string[]) { + for (let i = 0; i < b.length; i++) { + if (a[i] !== b[i]) return false + } + return true +}