From efddb7fac03e0fec02c16f7c7436acb00cd7f8f3 Mon Sep 17 00:00:00 2001 From: Matt Hill Date: Wed, 7 Jul 2021 19:18:08 -0600 Subject: [PATCH] stashing to handle out-of-sequence revisions --- client/lib/patch-db.ts | 5 +-- client/lib/store.ts | 75 +++++++++++++++++++++++++++++++--------- client/package-lock.json | 11 ++++++ client/package.json | 1 + 4 files changed, 74 insertions(+), 18 deletions(-) diff --git a/client/lib/patch-db.ts b/client/lib/patch-db.ts index 5eccd34..a56a439 100644 --- a/client/lib/patch-db.ts +++ b/client/lib/patch-db.ts @@ -2,7 +2,7 @@ import { merge, Observable, of } from 'rxjs' import { concatMap, finalize, tap } from 'rxjs/operators' import { Source } from './source/source' import { Store } from './store' -import { DBCache } from './types' +import { DBCache, Http } from './types' export { Operation } from 'fast-json-patch' @@ -11,9 +11,10 @@ export class PatchDB { constructor ( private readonly sources: Source[], + private readonly http: Http, private readonly initialCache: DBCache, ) { - this.store = new Store(this.initialCache) + this.store = new Store(this.http, this.initialCache) } sync$ (): Observable> { diff --git a/client/lib/store.ts b/client/lib/store.ts index 0596538..88aea03 100644 --- a/client/lib/store.ts +++ b/client/lib/store.ts @@ -1,17 +1,20 @@ -import { DBCache, Dump, Revision, Update } from './types' +import { DBCache, Dump, Http, Revision, Update } from './types' import { applyPatch, getValueByPointer } from 'fast-json-patch' import { BehaviorSubject, Observable } from 'rxjs' import { finalize } from 'rxjs/operators' +import BTree from 'sorted-btree' export class Store { cache: DBCache sequence$: BehaviorSubject private nodes: { [path: string]: BehaviorSubject } = { } + private stashed = new BTree() constructor ( - readonly initialCache: DBCache, + private readonly http: Http, + private readonly initialCache: DBCache, ) { - this.cache = initialCache + this.cache = this.initialCache this.sequence$ = new BehaviorSubject(initialCache.sequence) } @@ -34,22 +37,57 @@ export class Store { } update (update: Update): void { - if ((update as Revision).patch) { - if (this.cache.sequence + 1 !== update.id) throw new Error(`Outdated sequence: current: ${this.cache.sequence}, new: ${update.id}`) - applyPatch(this.cache.data, (update as Revision).patch, true, true); - (update as Revision).patch.forEach(op => { - this.updateNodesByPath(op.path) - }) + // if stale, return + if (update.id <= this.cache.sequence) return + if (this.isRevision(update)) { + this.handleRevision(update) } else { - this.cache.data = (update as Dump).value - this.updateNodesByPath('') + this.handleDump(update) } - this.cache.sequence = update.id - this.sequence$.next(this.cache.sequence) } - updateNodesByPath (revisionPath: string) { + reset (): void { + Object.values(this.nodes).forEach(node => node.complete()) + this.stashed.clear() + } + + private handleRevision (revision: Revision): void { + // stash the revision + this.stashed.set(revision.id, revision) + // if revision is futuristic, fetch missing revisions and return + if (revision.id > this.cache.sequence + 1) { + this.http.getRevisions(this.cache.sequence) + return + // if revision is next in line, apply contiguous stashed + } else { + this.processStashed(revision.id) + } + } + + private handleDump (dump: Dump): void { + this.cache.data = dump.value + this.stashed.deleteRange(this.cache.sequence, dump.id, false) + this.updateNodesByPath('') + this.updateSequence(dump.id) + this.processStashed(dump.id + 1) + } + + private processStashed (id: number): void { + while (true) { + const revision = this.stashed.get(id) + if (!revision) break + applyPatch(this.cache.data, revision.patch, true, true) + revision.patch.map(op => { + this.updateNodesByPath(op.path) + }) + this.updateSequence(id) + id++ + } + this.stashed.deleteRange(0, id, false) + } + + private updateNodesByPath (revisionPath: string) { Object.keys(this.nodes).forEach(nodePath => { if (!this.nodes[nodePath]) return if (nodePath.includes(revisionPath) || revisionPath.includes(nodePath)) { @@ -63,7 +101,12 @@ export class Store { }) } - reset (): void { - Object.values(this.nodes).forEach(node => node.complete()) + private updateSequence (sequence: number): void { + this.cache.sequence = sequence + this.sequence$.next(sequence) + } + + private isRevision (update: Update): update is Revision { + return !!(update as Revision).patch } } diff --git a/client/package-lock.json b/client/package-lock.json index d083b8e..95f8b4b 100644 --- a/client/package-lock.json +++ b/client/package-lock.json @@ -14,6 +14,7 @@ "mobx": "^6.1.4", "mobx-utils": "^6.0.3", "rxjs": "^6.6.3", + "sorted-btree": "^1.5.0", "uuid": "^8.3.2" }, "devDependencies": { @@ -1178,6 +1179,11 @@ "randombytes": "^2.1.0" } }, + "node_modules/sorted-btree": { + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/sorted-btree/-/sorted-btree-1.5.0.tgz", + "integrity": "sha512-1KzY80r3VpwGLGN/9oWjReUml3czxKfLz4iMV8Ro9KAHCg9xt0HwTkcb20JR+sHCiR5WUJ6uMAbe/HB3gy1qYA==" + }, "node_modules/source-map": { "version": "0.6.1", "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz", @@ -2562,6 +2568,11 @@ "randombytes": "^2.1.0" } }, + "sorted-btree": { + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/sorted-btree/-/sorted-btree-1.5.0.tgz", + "integrity": "sha512-1KzY80r3VpwGLGN/9oWjReUml3czxKfLz4iMV8Ro9KAHCg9xt0HwTkcb20JR+sHCiR5WUJ6uMAbe/HB3gy1qYA==" + }, "source-map": { "version": "0.6.1", "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz", diff --git a/client/package.json b/client/package.json index e1d531c..2eebd2a 100644 --- a/client/package.json +++ b/client/package.json @@ -16,6 +16,7 @@ "mobx": "^6.1.4", "mobx-utils": "^6.0.3", "rxjs": "^6.6.3", + "sorted-btree": "^1.5.0", "uuid": "^8.3.2" }, "devDependencies": {