From bdf0b32be6bba06638fb145c284d9a3e7bd9f447 Mon Sep 17 00:00:00 2001 From: Matt Hill Date: Thu, 1 Sep 2022 11:01:38 -0600 Subject: [PATCH] remove stash and store completely --- client/index.ts | 1 - client/lib/json-patch-lib.ts | 15 +-- client/lib/patch-db.ts | 196 ++++++++++++++++++++++++++++--- client/lib/store.ts | 222 ----------------------------------- client/lib/types.ts | 3 +- 5 files changed, 183 insertions(+), 254 deletions(-) delete mode 100644 client/lib/store.ts diff --git a/client/index.ts b/client/index.ts index f2de622..0561bac 100644 --- a/client/index.ts +++ b/client/index.ts @@ -1,4 +1,3 @@ export * from './lib/json-patch-lib' export * from './lib/patch-db' -export * from './lib/store' export * from './lib/types' diff --git a/client/lib/json-patch-lib.ts b/client/lib/json-patch-lib.ts index adbbf35..10b0d94 100644 --- a/client/lib/json-patch-lib.ts +++ b/client/lib/json-patch-lib.ts @@ -39,21 +39,8 @@ export function getValueByPointer>( export function applyOperation( doc: DBCache>, { path, op, value }: Operation & { value?: T }, -): Operation | null { - const current = getValueByPointer(doc.data, path) - const remove = { op: PatchOp.REMOVE, path } as const - const add = { op: PatchOp.ADD, path, value: current } as const - const replace = { op: PatchOp.REPLACE, path, value: current } as const - +) { doc.data = recursiveApply(doc.data, jsonPathToKeyArray(path), op, value) - - switch (op) { - case PatchOp.REMOVE: - return current === undefined ? null : add - case PatchOp.REPLACE: - case PatchOp.ADD: - return current === undefined ? remove : replace - } } function recursiveApply | any[]>( diff --git a/client/lib/patch-db.ts b/client/lib/patch-db.ts index 3347a1f..29be199 100644 --- a/client/lib/patch-db.ts +++ b/client/lib/patch-db.ts @@ -1,18 +1,184 @@ -import { map, Observable, shareReplay } from 'rxjs' -import { tap } from 'rxjs/operators' -import { Store } from './store' -import { DBCache, Update } from './types' +import { Bootstrapper, DBCache, Dump, Revision, Update } from './types' +import { BehaviorSubject, Observable, Subscription, withLatestFrom } from 'rxjs' +import { applyOperation, getValueByPointer } from './json-patch-lib' -export class PatchDB { - public store: Store = new Store(this.initialCache) - public cache$ = this.source$.pipe( - tap(res => this.store.update(res)), - map(_ => this.store.cache), - shareReplay(1), - ) +export class PatchDB { + private sub: Subscription | null = null + private watchedNodes: { [path: string]: BehaviorSubject } = {} - constructor( - private readonly source$: Observable>, - private readonly initialCache: DBCache, - ) {} + readonly cache$ = new BehaviorSubject({ sequence: 0, data: {} as T }) + + constructor(private readonly source$: Observable[]>) {} + + async start(bootstrapper: Bootstrapper) { + if (this.sub) return + + const initialCache = await bootstrapper.init() + this.cache$.next(initialCache) + + this.sub = this.source$ + .pipe(withLatestFrom(this.cache$)) + .subscribe(([updates, cache]) => { + this.proccessUpdates(updates, cache) + bootstrapper.update(cache) + }) + } + + stop() { + if (!this.sub) return + + Object.values(this.watchedNodes).forEach(node => node.complete()) + this.watchedNodes = {} + this.sub.unsubscribe() + this.sub = null + this.cache$.next({ sequence: 0, data: {} as T }) + } + + watch$(): Observable + watch$(p1: P1): Observable> + watch$>( + p1: P1, + p2: P2, + ): Observable[P2]>> + watch$< + P1 extends keyof T, + P2 extends keyof NonNullable, + P3 extends keyof NonNullable[P2]>, + >( + p1: P1, + p2: P2, + p3: P3, + ): Observable[P2]>[P3]>> + watch$< + P1 extends keyof T, + P2 extends keyof NonNullable, + P3 extends keyof NonNullable[P2]>, + P4 extends keyof NonNullable[P2]>[P3]>, + >( + p1: P1, + p2: P2, + p3: P3, + p4: P4, + ): Observable< + NonNullable[P2]>[P3]>[P4]> + > + watch$< + P1 extends keyof T, + P2 extends keyof NonNullable, + P3 extends keyof NonNullable[P2]>, + P4 extends keyof NonNullable[P2]>[P3]>, + P5 extends keyof NonNullable< + NonNullable[P2]>[P3]>[P4] + >, + >( + p1: P1, + p2: P2, + p3: P3, + p4: P4, + p5: P5, + ): Observable< + NonNullable< + NonNullable[P2]>[P3]>[P4]>[P5] + > + > + watch$< + P1 extends keyof T, + P2 extends keyof NonNullable, + P3 extends keyof NonNullable[P2]>, + P4 extends keyof NonNullable[P2]>[P3]>, + P5 extends keyof NonNullable< + NonNullable[P2]>[P3]>[P4] + >, + P6 extends keyof NonNullable< + NonNullable[P2]>[P3]>[P4]>[P5] + >, + >( + p1: P1, + p2: P2, + p3: P3, + p4: P4, + p5: P5, + p6: P6, + ): Observable< + NonNullable< + NonNullable< + NonNullable< + NonNullable[P2]>[P3]>[P4] + >[P5] + >[P6] + > + > + watch$(...args: (string | number)[]): Observable { + const path = `/${args.join('/')}` + + return new Observable(subscriber => { + const data = this.cache$.value.data + const value = getValueByPointer(data, path) + const source = this.watchedNodes[path] || new BehaviorSubject(value) + const subscription = source.subscribe(subscriber) + + this.watchedNodes[path] = source + this.updateWatchedNode(path, data) + + return () => { + subscription.unsubscribe() + + if (!source.observed) { + source.complete() + delete this.watchedNodes[path] + } + } + }) + } + + proccessUpdates(updates: Update[], cache: DBCache) { + updates.forEach(update => { + if (update.id <= cache.sequence) return + + if (this.isRevision(update)) { + if (update.id > cache.sequence + 1) { + console.error( + `Received futuristic revision. Expected ${ + cache.sequence + 1 + }, got ${update.id}`, + ) + return + } + this.handleRevision(update, cache) + } else { + this.handleDump(update, cache) + } + cache.sequence++ + }) + this.cache$.next(cache) + } + + private handleRevision(revision: Revision, cache: DBCache): void { + revision.patch.forEach(op => { + applyOperation(cache, op) + this.updateWatchedNodes(op.path, cache.data) + }) + } + + private handleDump(dump: Dump, cache: DBCache): void { + cache.data = { ...dump.value } + this.updateWatchedNodes('', cache.data) + } + + private updateWatchedNodes(revisionPath: string, data: T) { + Object.keys(this.watchedNodes).forEach(path => { + if (path.includes(revisionPath) || revisionPath.includes(path)) { + this.updateWatchedNode(path, data) + } + }) + } + + private updateWatchedNode(path: string, data: T): void { + const value = getValueByPointer(data, path) + this.watchedNodes[path].next(value) + } + + private isRevision(update: Update): update is Revision { + return 'patch' in update + } } diff --git a/client/lib/store.ts b/client/lib/store.ts deleted file mode 100644 index 677128b..0000000 --- a/client/lib/store.ts +++ /dev/null @@ -1,222 +0,0 @@ -import { DBCache, Dump, Revision, Update } from './types' -import { BehaviorSubject, Observable } from 'rxjs' -import { applyOperation, getValueByPointer, Operation } from './json-patch-lib' -import BTree from 'sorted-btree' - -export interface StashEntry { - revision: Revision - undo: Operation[] -} - -export class Store { - readonly sequence$ = new BehaviorSubject(this.cache.sequence) - private watchedNodes: { [path: string]: BehaviorSubject } = {} - private stash = new BTree() - - constructor(public cache: DBCache) {} - - watch$(): Observable - watch$(p1: P1): Observable> - watch$>( - p1: P1, - p2: P2, - ): Observable[P2]>> - watch$< - P1 extends keyof T, - P2 extends keyof NonNullable, - P3 extends keyof NonNullable[P2]>, - >( - p1: P1, - p2: P2, - p3: P3, - ): Observable[P2]>[P3]>> - watch$< - P1 extends keyof T, - P2 extends keyof NonNullable, - P3 extends keyof NonNullable[P2]>, - P4 extends keyof NonNullable[P2]>[P3]>, - >( - p1: P1, - p2: P2, - p3: P3, - p4: P4, - ): Observable< - NonNullable[P2]>[P3]>[P4]> - > - watch$< - P1 extends keyof T, - P2 extends keyof NonNullable, - P3 extends keyof NonNullable[P2]>, - P4 extends keyof NonNullable[P2]>[P3]>, - P5 extends keyof NonNullable< - NonNullable[P2]>[P3]>[P4] - >, - >( - p1: P1, - p2: P2, - p3: P3, - p4: P4, - p5: P5, - ): Observable< - NonNullable< - NonNullable[P2]>[P3]>[P4]>[P5] - > - > - watch$< - P1 extends keyof T, - P2 extends keyof NonNullable, - P3 extends keyof NonNullable[P2]>, - P4 extends keyof NonNullable[P2]>[P3]>, - P5 extends keyof NonNullable< - NonNullable[P2]>[P3]>[P4] - >, - P6 extends keyof NonNullable< - NonNullable[P2]>[P3]>[P4]>[P5] - >, - >( - p1: P1, - p2: P2, - p3: P3, - p4: P4, - p5: P5, - p6: P6, - ): Observable< - NonNullable< - NonNullable< - NonNullable< - NonNullable[P2]>[P3]>[P4] - >[P5] - >[P6] - > - > - watch$(...args: (string | number)[]): Observable { - const path = `/${args.join('/')}` - - return new Observable(subscriber => { - const value = getValueByPointer(this.cache.data, path) - const source = this.watchedNodes[path] || new BehaviorSubject(value) - const subscription = source.subscribe(subscriber) - - this.watchedNodes[path] = source - this.updateValue(path) - - return () => { - subscription.unsubscribe() - - if (!source.observed) { - source.complete() - delete this.watchedNodes[path] - } - } - }) - } - - update(update: Update): void { - if (this.isRevision(update)) { - // Handle revision if new and not known - if (update.id > this.cache.sequence && !this.stash.get(update.id)) { - this.handleRevision(update) - } - } else { - this.handleDump(update) - } - } - - reset(): void { - Object.values(this.watchedNodes).forEach(node => node.complete()) - this.watchedNodes = {} - this.stash.clear() - this.sequence$.next(0) - this.cache = { - sequence: 0, - data: {} as any, - } - } - - private updateValue(path: string): void { - const value = getValueByPointer(this.cache.data, path) - - this.watchedNodes[path].next(value) - } - - private handleRevision(revision: Revision): void { - this.stash.set(revision.id, { revision, undo: [] }) - this.processStashed(revision.id) - } - - private handleDump({ value, id }: Dump): void { - this.cache.data = { ...value } - this.stash.deleteRange(this.cache.sequence, id, false) - this.updateWatchedNodes('') - this.updateSequence(id) - this.processStashed(id + 1) - } - - private processStashed(id: number): void { - this.undoRevisions(id) - this.applyRevisions(id) - } - - private undoRevisions(id: number): void { - let stashEntry = this.stash.get(this.stash.maxKey() as number) - - while (stashEntry && stashEntry.revision.id > id) { - stashEntry.undo.forEach(u => applyOperation(this.cache, u)) - stashEntry = this.stash.nextLowerPair(stashEntry.revision.id)?.[1] - } - } - - private applyRevisions(id: number): void { - let revision = this.stash.get(id)?.revision - while (revision) { - let undo: Operation[] = [] - let success = false - - try { - revision.patch.forEach(op => { - const u = applyOperation(this.cache, op) - if (u) undo.push(u) - }) - success = true - } catch (e) { - undo.forEach(u => applyOperation(this.cache, u)) - undo = [] - } - - if (success) { - revision.patch.forEach(op => { - this.updateWatchedNodes(op.path) - }) - } - - if (revision.id === this.cache.sequence + 1) { - this.updateSequence(revision.id) - } else { - this.stash.set(revision.id, { revision, undo }) - } - - // increment revision for next loop - revision = this.stash.nextHigherPair(revision.id)?.[1].revision - } - - // delete all old stashed revisions - this.stash.deleteRange(0, this.cache.sequence, false) - } - - private updateWatchedNodes(revisionPath: string) { - Object.keys(this.watchedNodes).forEach(path => { - if (path.includes(revisionPath) || revisionPath.includes(path)) { - this.updateValue(path) - } - }) - } - - private updateSequence(sequence: number): void { - this.cache.sequence = sequence - this.sequence$.next(sequence) - } - - private isRevision(update: Update): update is Revision { - return 'patch' in update - } -} diff --git a/client/lib/types.ts b/client/lib/types.ts index 0a9aa2f..cb4ece6 100644 --- a/client/lib/types.ts +++ b/client/lib/types.ts @@ -4,11 +4,10 @@ import { Operation } from './json-patch-lib' export type Revision = { id: number patch: Operation[] - expireId: string | null } // dump/replace the entire store with T -export type Dump = { id: number; value: T; expireId: string | null } +export type Dump = { id: number; value: T } export type Update = Revision | Dump