don't emit until bootstrapped with data (#47)

This commit is contained in:
Matt Hill
2022-09-21 10:51:32 -06:00
committed by GitHub
parent 20beb61baa
commit 4d987b1921

View File

@@ -1,5 +1,13 @@
import { Bootstrapper, DBCache, Dump, Revision, Update } from './types'
import { BehaviorSubject, Observable, Subscription, withLatestFrom } from 'rxjs'
import {
BehaviorSubject,
filter,
Observable,
Subscription,
switchMap,
take,
withLatestFrom,
} from 'rxjs'
import {
applyOperation,
getValueByPointer,
@@ -15,7 +23,10 @@ export class PatchDB<T extends { [key: string]: any }> {
}
} = {}
readonly cache$ = new BehaviorSubject({ sequence: 0, data: {} as T })
readonly cache$ = new BehaviorSubject<DBCache<T>>({
sequence: 0,
data: {} as T,
})
constructor(private readonly source$: Observable<Update<T>[]>) {}
@@ -118,18 +129,21 @@ export class PatchDB<T extends { [key: string]: any }> {
>
>
watch$(...args: (string | number)[]): Observable<any> {
return this.cache$.pipe(
filter(({ sequence }) => !!sequence),
take(1),
switchMap(({ data }) => {
const path = args.length ? `/${args.join('/')}` : ''
if (!this.watchedNodes[path]) {
const data = this.cache$.value.data
const value = getValueByPointer(data, path)
this.watchedNodes[path] = {
subject: new BehaviorSubject(value),
pathArr: jsonPathToKeyArray(path),
}
}
return this.watchedNodes[path].subject
}),
)
}
proccessUpdates(updates: Update<T>[], cache: DBCache<T>) {