From 770602169e65c55d640f98a1f819f9bbabf3ba18 Mon Sep 17 00:00:00 2001 From: waterplea Date: Thu, 26 May 2022 16:52:00 +0300 Subject: [PATCH] feat: make `watch$` strictNullCheck compatible --- client/lib/store.ts | 61 +++++++++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/client/lib/store.ts b/client/lib/store.ts index aa5a7eb..6770703 100644 --- a/client/lib/store.ts +++ b/client/lib/store.ts @@ -1,5 +1,5 @@ import { DBCache, Dump, Http, Revision, Update } from './types' -import { BehaviorSubject, Observable } from 'rxjs' +import { BehaviorSubject, Observable, ReplaySubject } from 'rxjs' import { applyOperation, getValueByPointer, Operation } from './json-patch-lib' import BTree from 'sorted-btree' @@ -11,10 +11,10 @@ export interface StashEntry { export class Store { cache: DBCache sequence$: BehaviorSubject - private watchedNodes: { [path: string]: BehaviorSubject } = { } + private watchedNodes: { [path: string]: ReplaySubject } = {} private stash = new BTree() - constructor ( + constructor( private readonly http: Http, private readonly initialCache: DBCache, ) { @@ -22,24 +22,24 @@ export class Store { this.sequence$ = new BehaviorSubject(initialCache.sequence) } - watch$ (): Observable - watch$ (p1: P1): Observable - watch$ (p1: P1, p2: P2): Observable - watch$ (p1: P1, p2: P2, p3: P3): Observable - watch$ (p1: P1, p2: P2, p3: P3, p4: P4): Observable - watch$ (p1: P1, p2: P2, p3: P3, p4: P4, p5: P5): Observable - watch$ (p1: P1, p2: P2, p3: P3, p4: P4, p5: P5, p6: P6): Observable - watch$ (...args: (string | number)[]): Observable { + watch$(): Observable + watch$(p1: P1): Observable> + watch$>(p1: P1, p2: P2): Observable[P2]>> + watch$, P3 extends keyof NonNullable[P2]>>(p1: P1, p2: P2, p3: P3): Observable[P2]>[P3]>> + watch$, P3 extends keyof NonNullable[P2]>, P4 extends keyof NonNullable[P2]>[P3]>>(p1: P1, p2: P2, p3: P3, p4: P4): Observable[P2]>[P3]>[P4]>> + watch$, P3 extends keyof NonNullable[P2]>, P4 extends keyof NonNullable[P2]>[P3]>, P5 extends keyof NonNullable[P2]>[P3]>[P4]>>(p1: P1, p2: P2, p3: P3, p4: P4, p5: P5): Observable[P2]>[P3]>[P4]>[P5]>> + watch$, P3 extends keyof NonNullable[P2]>, P4 extends keyof NonNullable[P2]>[P3]>, P5 extends keyof NonNullable[P2]>[P3]>[P4]>, P6 extends keyof NonNullable[P2]>[P3]>[P4]>[P5]>>(p1: P1, p2: P2, p3: P3, p4: P4, p5: P5, p6: P6): Observable[P2]>[P3]>[P4]>[P5]>[P6]>> + watch$(...args: (string | number)[]): Observable { const path = `/${args.join('/')}` if (!this.watchedNodes[path]) { - this.watchedNodes[path] = new BehaviorSubject(getValueByPointer(this.cache.data, path)) + this.watchedNodes[path] = new ReplaySubject(1) } return this.watchedNodes[path].asObservable() } - update (update: Update): void { + update(update: Update): void { if (this.isRevision(update)) { - // if old or known, return + // if old or known, return if (update.id <= this.cache.sequence || this.stash.get(update.id)) return this.handleRevision(update) } else { @@ -47,18 +47,26 @@ export class Store { } } - reset (): void { + reset(): void { Object.values(this.watchedNodes).forEach(node => node.complete()) - this.watchedNodes = { } + this.watchedNodes = {} this.stash.clear() this.sequence$.next(0) this.cache = { sequence: 0, - data: { } as any, + data: {} as any, } } - private handleRevision (revision: Revision): void { + private updateValue(path: string): void { + const value = getValueByPointer(this.cache.data, path) + + if (value !== undefined) { + this.watchedNodes[path].next(value) + } + } + + private handleRevision(revision: Revision): void { // stash the revision this.stash.set(revision.id, { revision, undo: [] }) @@ -70,7 +78,7 @@ export class Store { this.processStashed(revision.id) } - private handleDump ({ value, id }: Dump): void { + private handleDump({ value, id }: Dump): void { this.cache.data = { ...value } this.stash.deleteRange(this.cache.sequence, id, false) this.updateWatchedNodes('') @@ -78,12 +86,12 @@ export class Store { this.processStashed(id + 1) } - private processStashed (id: number): void { + private processStashed(id: number): void { this.undoRevisions(id) this.applyRevisions(id) } - private undoRevisions (id: number): void { + private undoRevisions(id: number): void { let stashEntry = this.stash.get(this.stash.maxKey() as number) while (stashEntry && stashEntry.revision.id > id) { @@ -92,7 +100,7 @@ export class Store { } } - private applyRevisions (id: number): void { + private applyRevisions(id: number): void { let revision = this.stash.get(id)?.revision while (revision) { let undo: Operation[] = [] @@ -129,7 +137,7 @@ export class Store { this.stash.deleteRange(0, this.cache.sequence, false) } - private updateWatchedNodes (revisionPath: string) { + private updateWatchedNodes(revisionPath: string) { const kill = (path: string) => { this.watchedNodes[path].complete() delete this.watchedNodes[path] @@ -139,18 +147,17 @@ export class Store { if (this.watchedNodes[path].observers.length === 0) return kill(path) if (path.includes(revisionPath) || revisionPath.includes(path)) { - const val = getValueByPointer(this.cache.data, path) - this.watchedNodes[path].next(val) + this.updateValue(path) } }) } - private updateSequence (sequence: number): void { + private updateSequence(sequence: number): void { this.cache.sequence = sequence this.sequence$.next(sequence) } - private isRevision (update: Update): update is Revision { + private isRevision(update: Update): update is Revision { return 'patch' in update } }