much more efficient (#46)

* much more efficient

* update children too

* better compare
This commit is contained in:
Matt Hill
2022-09-14 11:21:00 -06:00
committed by GitHub
parent 74c01eb5db
commit 20beb61baa
2 changed files with 46 additions and 35 deletions

View File

@@ -43,6 +43,10 @@ export function applyOperation<T>(
doc.data = recursiveApply(doc.data, jsonPathToKeyArray(path), op, value)
}
export function jsonPathToKeyArray(path: string): string[] {
return path.split('/').slice(1)
}
function recursiveApply<T extends Record<string, any> | any[]>(
data: T,
path: readonly string[],
@@ -105,7 +109,3 @@ function recursiveApplyArray<T extends any[]>(
function isObject(val: any): val is Record<string, unknown> {
return typeof val === 'object' && val !== null && !Array.isArray(val)
}
function jsonPathToKeyArray(path: string): string[] {
return path.split('/').slice(1)
}

View File

@@ -1,10 +1,19 @@
import { Bootstrapper, DBCache, Dump, Revision, Update } from './types'
import { BehaviorSubject, Observable, Subscription, withLatestFrom } from 'rxjs'
import { applyOperation, getValueByPointer } from './json-patch-lib'
import {
applyOperation,
getValueByPointer,
jsonPathToKeyArray,
} from './json-patch-lib'
export class PatchDB<T extends { [key: string]: any }> {
private sub: Subscription | null = null
private watchedNodes: { [path: string]: BehaviorSubject<any> } = {}
private watchedNodes: {
[path: string]: {
subject: BehaviorSubject<any>
pathArr: string[]
}
} = {}
readonly cache$ = new BehaviorSubject({ sequence: 0, data: {} as T })
@@ -27,7 +36,7 @@ export class PatchDB<T extends { [key: string]: any }> {
stop() {
if (!this.sub) return
Object.values(this.watchedNodes).forEach(node => node.complete())
Object.values(this.watchedNodes).forEach(node => node.subject.complete())
this.watchedNodes = {}
this.sub.unsubscribe()
this.sub = null
@@ -109,40 +118,30 @@ export class PatchDB<T extends { [key: string]: any }> {
>
>
watch$(...args: (string | number)[]): Observable<any> {
const path = `/${args.join('/')}`
const path = args.length ? `/${args.join('/')}` : ''
return new Observable(subscriber => {
if (!this.watchedNodes[path]) {
const data = this.cache$.value.data
const value = getValueByPointer(data, path)
const source = this.watchedNodes[path] || new BehaviorSubject(value)
const subscription = source.subscribe(subscriber)
this.watchedNodes[path] = source
this.updateWatchedNode(path, data)
return () => {
subscription.unsubscribe()
if (!source.observed) {
source.complete()
delete this.watchedNodes[path]
}
this.watchedNodes[path] = {
subject: new BehaviorSubject(value),
pathArr: jsonPathToKeyArray(path),
}
})
}
return this.watchedNodes[path].subject
}
proccessUpdates(updates: Update<T>[], cache: DBCache<T>) {
updates.forEach(update => {
if (update.id <= cache.sequence) return
if (this.isRevision(update)) {
if (update.id > cache.sequence + 1) {
console.error(
`Received futuristic revision. Expected ${
cache.sequence + 1
}, got ${update.id}`,
const expected = cache.sequence + 1
if (update.id < expected) return
if (update.id > expected) {
return console.error(
// unreachable
`Received futuristic revision. Expected ${expected}, got ${update.id}`,
)
return
}
this.handleRevision(update, cache)
} else {
@@ -154,8 +153,12 @@ export class PatchDB<T extends { [key: string]: any }> {
}
private handleRevision(revision: Revision, cache: DBCache<T>): void {
// apply opperations
revision.patch.forEach(op => {
applyOperation(cache, op)
})
// update watched nodes
revision.patch.forEach(op => {
this.updateWatchedNodes(op.path, cache.data)
})
}
@@ -165,9 +168,10 @@ export class PatchDB<T extends { [key: string]: any }> {
this.updateWatchedNodes('', cache.data)
}
private updateWatchedNodes(revisionPath: string, data: T) {
Object.keys(this.watchedNodes).forEach(path => {
if (path.includes(revisionPath) || revisionPath.includes(path)) {
private updateWatchedNodes(revisionPath: string, data: T): void {
const r = jsonPathToKeyArray(revisionPath)
Object.entries(this.watchedNodes).forEach(([path, { pathArr }]) => {
if (startsWith(pathArr, r) || startsWith(r, pathArr)) {
this.updateWatchedNode(path, data)
}
})
@@ -175,10 +179,17 @@ export class PatchDB<T extends { [key: string]: any }> {
private updateWatchedNode(path: string, data: T): void {
const value = getValueByPointer(data, path)
this.watchedNodes[path].next(value)
this.watchedNodes[path].subject.next(value)
}
private isRevision(update: Update<T>): update is Revision {
return 'patch' in update
}
}
function startsWith(a: string[], b: string[]) {
for (let i = 0; i < b.length; i++) {
if (a[i] !== b[i]) return false
}
return true
}