mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
feat(client): remove data mutation (#32)
* feat(client): remove data mutation * chore: address comments and fix some other issues * chore: send the most recent cache upon subscription
This commit is contained in:
@@ -1,8 +1,4 @@
|
|||||||
import { PatchOp } from './types'
|
import { DBCache, PatchOp } from './types'
|
||||||
|
|
||||||
export interface Validator<T> {
|
|
||||||
(operation: Operation, index: number, doc: T, existingPathFragment: string): void
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface BaseOperation {
|
export interface BaseOperation {
|
||||||
path: string
|
path: string
|
||||||
@@ -22,68 +18,68 @@ export interface ReplaceOperation<T> extends BaseOperation {
|
|||||||
value: T
|
value: T
|
||||||
}
|
}
|
||||||
|
|
||||||
export type Doc = { [key: string]: any }
|
export type Operation<T> = AddOperation<T> | RemoveOperation | ReplaceOperation<T>
|
||||||
|
|
||||||
export type Operation = AddOperation<any> | RemoveOperation | ReplaceOperation<any>
|
export function getValueByPointer<T extends Record<string, T>> (data: T, pointer: string): any {
|
||||||
|
if (pointer === '/') return data
|
||||||
|
|
||||||
export function getValueByPointer (doc: any, pointer: string): any {
|
|
||||||
if (pointer === '/') return doc
|
|
||||||
const pathArr = pointer.split('/')
|
|
||||||
pathArr.shift()
|
|
||||||
try {
|
try {
|
||||||
return pathArr.reduce((acc, next) => acc[next], doc)
|
return jsonPathToKeyArray(pointer).reduce((acc, next) => acc[next], data)
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
return undefined
|
return undefined
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export function applyOperation (doc: Doc, op: Operation): Operation | null {
|
export function applyOperation<T> (
|
||||||
let undo: Operation | null = null
|
doc: DBCache<Record<string, any>>,
|
||||||
const pathArr = op.path.split('/')
|
{ path, op, value }: Operation<T> & { value?: T },
|
||||||
pathArr.shift()
|
): Operation<T> | null {
|
||||||
pathArr.reduce((node, key, i) => {
|
const current = getValueByPointer(doc.data, path)
|
||||||
if (!isObject) {
|
const remove = { op: PatchOp.REMOVE, path} as const
|
||||||
throw Error('patch cannot be applied. Path contains non object')
|
const add = { op: PatchOp.ADD, path, value: current} as const
|
||||||
}
|
const replace = { op: PatchOp.REPLACE, path, value: current } as const
|
||||||
|
|
||||||
if (i < pathArr.length - 1) {
|
doc.data = recursiveApply(doc.data, jsonPathToKeyArray(path), value)
|
||||||
// iterate node
|
|
||||||
return node[key]
|
|
||||||
}
|
|
||||||
|
|
||||||
// if last key
|
switch (op) {
|
||||||
const curVal = node[key]
|
case PatchOp.REMOVE:
|
||||||
if (op.op === 'add' || op.op === 'replace') {
|
return current === undefined
|
||||||
node[key] = op.value
|
? null
|
||||||
if (curVal) {
|
: add
|
||||||
undo = {
|
case PatchOp.REPLACE:
|
||||||
op: PatchOp.REPLACE,
|
case PatchOp.ADD:
|
||||||
path: op.path,
|
return current === undefined
|
||||||
value: curVal,
|
? remove
|
||||||
}
|
: replace
|
||||||
} else {
|
}
|
||||||
undo = {
|
|
||||||
op: PatchOp.REMOVE,
|
|
||||||
path: op.path,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
delete node[key]
|
|
||||||
if (curVal) {
|
|
||||||
undo = {
|
|
||||||
op: PatchOp.ADD,
|
|
||||||
path: op.path,
|
|
||||||
value: curVal,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, doc)
|
|
||||||
|
|
||||||
return undo
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function isObject (val: any): val is Doc {
|
function recursiveApply<T extends Record<string, T>> (data: T, path: readonly string[], value?: any): T {
|
||||||
|
if (!path.length) return value
|
||||||
|
|
||||||
|
if (!isObject(data)) {
|
||||||
|
throw Error('Patch cannot be applied. Path contains non object')
|
||||||
|
}
|
||||||
|
|
||||||
|
const updated = recursiveApply(data[path[0]], path.slice(1), value)
|
||||||
|
const result = {
|
||||||
|
...data,
|
||||||
|
[path[0]]: updated,
|
||||||
|
}
|
||||||
|
|
||||||
|
if (updated === undefined) {
|
||||||
|
delete result[path[0]]
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
function isObject (val: any): val is Record<string, unknown> {
|
||||||
return typeof val === 'object' && !Array.isArray(val) && val !== null
|
return typeof val === 'object' && !Array.isArray(val) && val !== null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function jsonPathToKeyArray (path: string): string[] {
|
||||||
|
return path.split('/').slice(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { merge, Observable, Subject, Subscription } from 'rxjs'
|
import { merge, Observable, ReplaySubject, Subject, Subscription } from 'rxjs'
|
||||||
import { Store } from './store'
|
import { Store } from './store'
|
||||||
import { DBCache, Http } from './types'
|
import { DBCache, Http } from './types'
|
||||||
import { RPCError } from './source/ws-source'
|
import { RPCError } from './source/ws-source'
|
||||||
@@ -8,7 +8,7 @@ export class PatchDB<T> {
|
|||||||
public store: Store<T> = new Store(this.http, this.initialCache)
|
public store: Store<T> = new Store(this.http, this.initialCache)
|
||||||
public connectionError$ = new Subject<Error>()
|
public connectionError$ = new Subject<Error>()
|
||||||
public rpcError$ = new Subject<RPCError>()
|
public rpcError$ = new Subject<RPCError>()
|
||||||
public cache$ = new Subject<DBCache<T>>()
|
public cache$ = new ReplaySubject<DBCache<T>>(1)
|
||||||
|
|
||||||
private updatesSub?: Subscription
|
private updatesSub?: Subscription
|
||||||
private sourcesSub = this.sources$.subscribe(sources => {
|
private sourcesSub = this.sources$.subscribe(sources => {
|
||||||
@@ -36,8 +36,6 @@ export class PatchDB<T> {
|
|||||||
|
|
||||||
clean () {
|
clean () {
|
||||||
this.sourcesSub.unsubscribe()
|
this.sourcesSub.unsubscribe()
|
||||||
if (this.updatesSub) {
|
this.updatesSub?.unsubscribe()
|
||||||
this.updatesSub.unsubscribe()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,13 +1,13 @@
|
|||||||
import { Observable } from "rxjs";
|
import { Observable } from 'rxjs'
|
||||||
import { map } from "rxjs/operators";
|
import { map } from 'rxjs/operators'
|
||||||
import { Update } from "../types";
|
import { Update } from '../types'
|
||||||
import { Source } from "./source";
|
import { Source } from './source'
|
||||||
import { RPCResponse } from "./ws-source";
|
import { RPCResponse } from './ws-source'
|
||||||
|
|
||||||
export class MockSource<T> implements Source<T> {
|
export class MockSource<T> implements Source<T> {
|
||||||
constructor(private readonly seed: Observable<Update<T>>) {}
|
constructor (private readonly seed: Observable<Update<T>>) { }
|
||||||
|
|
||||||
watch$(): Observable<RPCResponse<Update<T>>> {
|
watch$ (): Observable<RPCResponse<Update<T>>> {
|
||||||
return this.seed.pipe(map((result) => ({ result, jsonrpc: "2.0" })));
|
return this.seed.pipe(map((result) => ({ result, jsonrpc: '2.0' })))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { BehaviorSubject, concat, from, Observable, of, Subject } from 'rxjs'
|
import { BehaviorSubject, concat, from, Observable, of } from 'rxjs'
|
||||||
import { concatMap, delay, map, skip, switchMap, take, tap } from 'rxjs/operators'
|
import { concatMap, delay, map, skip, switchMap, take, tap } from 'rxjs/operators'
|
||||||
import { Store } from '../store'
|
import { Store } from '../store'
|
||||||
import { Http, Update } from '../types'
|
import { Http, Update } from '../types'
|
||||||
|
|||||||
@@ -43,14 +43,6 @@ export interface RPCError extends RPCBase {
|
|||||||
}
|
}
|
||||||
export type RPCResponse<T> = RPCSuccess<T> | RPCError
|
export type RPCResponse<T> = RPCSuccess<T> | RPCError
|
||||||
|
|
||||||
function isRpcError<Error, Result> (arg: { error: Error } | { result: Result}): arg is { error: Error } {
|
|
||||||
return !!(arg as any).error
|
|
||||||
}
|
|
||||||
|
|
||||||
function isRpcSuccess<Error, Result> (arg: { error: Error } | { result: Result}): arg is { result: Result } {
|
|
||||||
return !!(arg as any).result
|
|
||||||
}
|
|
||||||
|
|
||||||
class RpcError {
|
class RpcError {
|
||||||
code: number
|
code: number
|
||||||
message: string
|
message: string
|
||||||
@@ -61,4 +53,4 @@ class RpcError {
|
|||||||
this.message = e.message
|
this.message = e.message
|
||||||
this.details = e.data.details
|
this.details = e.data.details
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import BTree from 'sorted-btree'
|
|||||||
|
|
||||||
export interface StashEntry {
|
export interface StashEntry {
|
||||||
revision: Revision
|
revision: Revision
|
||||||
undo: Operation[]
|
undo: Operation<unknown>[]
|
||||||
}
|
}
|
||||||
|
|
||||||
export class Store<T extends { [key: string]: any }> {
|
export class Store<T extends { [key: string]: any }> {
|
||||||
@@ -70,20 +70,12 @@ export class Store<T extends { [key: string]: any }> {
|
|||||||
this.processStashed(revision.id)
|
this.processStashed(revision.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
private handleDump (dump: Dump<T>): void {
|
private handleDump ({ value, id }: Dump<T>): void {
|
||||||
Object.keys(this.cache.data).forEach(key => {
|
this.cache.data = { ...value }
|
||||||
if (dump.value[key] === undefined) {
|
this.stash.deleteRange(this.cache.sequence, id, false)
|
||||||
delete this.cache.data[key]
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
Object.entries(dump.value).forEach(([key, val]) => {
|
|
||||||
(this.cache.data as any)[key] = val
|
|
||||||
})
|
|
||||||
this.stash.deleteRange(this.cache.sequence, dump.id, false)
|
|
||||||
this.updateWatchedNodes('')
|
this.updateWatchedNodes('')
|
||||||
this.updateSequence(dump.id)
|
this.updateSequence(id)
|
||||||
this.processStashed(dump.id + 1)
|
this.processStashed(id + 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
private processStashed (id: number): void {
|
private processStashed (id: number): void {
|
||||||
@@ -95,9 +87,7 @@ export class Store<T extends { [key: string]: any }> {
|
|||||||
let stashEntry = this.stash.get(this.stash.maxKey() as number)
|
let stashEntry = this.stash.get(this.stash.maxKey() as number)
|
||||||
|
|
||||||
while (stashEntry && stashEntry.revision.id > id) {
|
while (stashEntry && stashEntry.revision.id > id) {
|
||||||
stashEntry.undo.forEach(u => {
|
stashEntry.undo.forEach(u => applyOperation(this.cache, u))
|
||||||
applyOperation(this.cache.data, u)
|
|
||||||
})
|
|
||||||
stashEntry = this.stash.nextLowerPair(stashEntry.revision.id)?.[1]
|
stashEntry = this.stash.nextLowerPair(stashEntry.revision.id)?.[1]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -105,19 +95,17 @@ export class Store<T extends { [key: string]: any }> {
|
|||||||
private applyRevisions (id: number): void {
|
private applyRevisions (id: number): void {
|
||||||
let revision = this.stash.get(id)?.revision
|
let revision = this.stash.get(id)?.revision
|
||||||
while (revision) {
|
while (revision) {
|
||||||
let undo: Operation[] = []
|
let undo: Operation<unknown>[] = []
|
||||||
|
|
||||||
let success = false
|
let success = false
|
||||||
|
|
||||||
try {
|
try {
|
||||||
revision.patch.forEach(op => {
|
revision.patch.forEach(op => {
|
||||||
const u = applyOperation(this.cache.data, op)
|
const u = applyOperation(this.cache, op)
|
||||||
if (u) undo.push(u)
|
if (u) undo.push(u)
|
||||||
})
|
})
|
||||||
success = true
|
success = true
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
undo.forEach(u => {
|
undo.forEach(u => applyOperation(this.cache, u))
|
||||||
applyOperation(this.cache.data, u)
|
|
||||||
})
|
|
||||||
undo = []
|
undo = []
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -163,6 +151,6 @@ export class Store<T extends { [key: string]: any }> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private isRevision (update: Update<T>): update is Revision {
|
private isRevision (update: Update<T>): update is Revision {
|
||||||
return !!(update as Revision).patch
|
return 'patch' in update
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import { Operation } from './json-patch-lib'
|
import { Operation } from './json-patch-lib'
|
||||||
|
|
||||||
// revise a collection of nodes.
|
// revise a collection of nodes.
|
||||||
export type Revision = { id: number, patch: Operation[], expireId: string | null }
|
export type Revision = { id: number, patch: Operation<unknown>[], expireId: string | null }
|
||||||
// dump/replace the entire store with T
|
// dump/replace the entire store with T
|
||||||
export type Dump<T> = { id: number, value: T, expireId: string | null }
|
export type Dump<T> = { id: number, value: T, expireId: string | null }
|
||||||
|
|
||||||
@@ -23,7 +23,7 @@ export interface Bootstrapper<T> {
|
|||||||
update (cache: DBCache<T>): Promise<void>
|
update (cache: DBCache<T>): Promise<void>
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface DBCache<T extends { [key: string]: any }>{
|
export interface DBCache<T extends Record<string, any>>{
|
||||||
sequence: number,
|
sequence: number,
|
||||||
data: T
|
data: T
|
||||||
}
|
}
|
||||||
|
|||||||
12
client/package-lock.json
generated
12
client/package-lock.json
generated
@@ -383,9 +383,9 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/minimist": {
|
"node_modules/minimist": {
|
||||||
"version": "1.2.5",
|
"version": "1.2.6",
|
||||||
"resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.5.tgz",
|
"resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.6.tgz",
|
||||||
"integrity": "sha512-FM9nNUYrRBAELZQT3xeZQ7fmMOBg6nWNmJKTcgsJeaLstP/UODVpGsr5OhXhhXg6f+qtJ8uiZ+PUxkDWcgIXLw==",
|
"integrity": "sha512-Jsjnk4bw3YJqYzbdyBiNsPWHPfO++UGG749Cxs6peCu5Xg4nrena6OVxOYxrQTqww0Jmwt+Ref8rggumkTLz9Q==",
|
||||||
"dev": true
|
"dev": true
|
||||||
},
|
},
|
||||||
"node_modules/mkdirp": {
|
"node_modules/mkdirp": {
|
||||||
@@ -905,9 +905,9 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"minimist": {
|
"minimist": {
|
||||||
"version": "1.2.5",
|
"version": "1.2.6",
|
||||||
"resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.5.tgz",
|
"resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.6.tgz",
|
||||||
"integrity": "sha512-FM9nNUYrRBAELZQT3xeZQ7fmMOBg6nWNmJKTcgsJeaLstP/UODVpGsr5OhXhhXg6f+qtJ8uiZ+PUxkDWcgIXLw==",
|
"integrity": "sha512-Jsjnk4bw3YJqYzbdyBiNsPWHPfO++UGG749Cxs6peCu5Xg4nrena6OVxOYxrQTqww0Jmwt+Ref8rggumkTLz9Q==",
|
||||||
"dev": true
|
"dev": true
|
||||||
},
|
},
|
||||||
"mkdirp": {
|
"mkdirp": {
|
||||||
|
|||||||
Reference in New Issue
Block a user