From 731e5129d85ca1e859e18279ff584571295690f0 Mon Sep 17 00:00:00 2001 From: waterplea Date: Sun, 21 Aug 2022 08:20:39 +0300 Subject: [PATCH] fix: properly handle watch$ termination --- client/lib/store.ts | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/client/lib/store.ts b/client/lib/store.ts index de7d5e9..677128b 100644 --- a/client/lib/store.ts +++ b/client/lib/store.ts @@ -1,5 +1,5 @@ import { DBCache, Dump, Revision, Update } from './types' -import { BehaviorSubject, Observable, ReplaySubject } from 'rxjs' +import { BehaviorSubject, Observable } from 'rxjs' import { applyOperation, getValueByPointer, Operation } from './json-patch-lib' import BTree from 'sorted-btree' @@ -10,7 +10,7 @@ export interface StashEntry { export class Store { readonly sequence$ = new BehaviorSubject(this.cache.sequence) - private watchedNodes: { [path: string]: ReplaySubject } = {} + private watchedNodes: { [path: string]: BehaviorSubject } = {} private stash = new BTree() constructor(public cache: DBCache) {} @@ -91,11 +91,24 @@ export class Store { > watch$(...args: (string | number)[]): Observable { const path = `/${args.join('/')}` - if (!this.watchedNodes[path]) { - this.watchedNodes[path] = new ReplaySubject(1) + + return new Observable(subscriber => { + const value = getValueByPointer(this.cache.data, path) + const source = this.watchedNodes[path] || new BehaviorSubject(value) + const subscription = source.subscribe(subscriber) + + this.watchedNodes[path] = source this.updateValue(path) - } - return this.watchedNodes[path].asObservable() + + return () => { + subscription.unsubscribe() + + if (!source.observed) { + source.complete() + delete this.watchedNodes[path] + } + } + }) } update(update: Update): void { @@ -191,14 +204,7 @@ export class Store { } private updateWatchedNodes(revisionPath: string) { - const kill = (path: string) => { - this.watchedNodes[path].complete() - delete this.watchedNodes[path] - } - Object.keys(this.watchedNodes).forEach(path => { - if (this.watchedNodes[path].observers.length === 0) return kill(path) - if (path.includes(revisionPath) || revisionPath.includes(path)) { this.updateValue(path) }