removes temps, massive refactor

This commit is contained in:
Matt Hill
2021-06-18 15:27:36 -06:00
committed by Aiden McClelland
parent 0dcbb3956e
commit 950773e542
19 changed files with 93 additions and 1135 deletions

View File

@@ -1,44 +0,0 @@
import { Subject, BehaviorSubject, of, Observable, Observer, throwError } from 'rxjs'
import { concatMap, map, catchError, filter, take } from 'rxjs/operators'
export type Action<T> = {
action: () => T,
notify: BehaviorSubject<undefined | T>
}
export class ActionSerializer {
private readonly sequentialActions = new Subject<Action<any>>()
constructor () {
this.sequentialActions.pipe(
concatMap(({ action, notify }) => fromSync$(action).pipe(
catchError(e => of(notify.next({ error: e }))),
map(result => notify.next({ result })),
)),
catchError(e => of(console.error(`Action Serializer Exception`, e))),
).subscribe()
}
run$<T> (action: () => T): Observable<T> {
const notify = new BehaviorSubject(undefined) as BehaviorSubject<T | undefined>
this.sequentialActions.next({ action, notify })
return (notify as BehaviorSubject<T>).pipe(
filter(res => res !== undefined),
take(1),
concatMap((res: any) => res.error ? throwError(res.error) : of(res.result)),
)
}
}
function fromSync$<S, T> (action: (s: S) => T, s: S): Observable<T>
function fromSync$<T> (action: () => T): Observable<T>
function fromSync$<S, T> (action: (s: S) => T, s?: S): Observable<T> {
return new Observable((subscriber: Observer<T>) => {
try {
subscriber.next(action(s as S))
subscriber.complete()
} catch (e) {
subscriber.error(e)
}
})
}

View File

@@ -1,133 +1,33 @@
import { EMPTY, from, merge, Observable, of, Subject, timer } from 'rxjs'
import { catchError, concatMap, debounce, debounceTime, delay, finalize, map, skip, take, takeUntil, tap, throttleTime } from 'rxjs/operators'
import { merge, Observable, of } from 'rxjs'
import { concatMap, finalize, map, tap } from 'rxjs/operators'
import { Source } from './source/source'
import { Dump, SequenceStore, Result, Revision } from './sequence-store'
import { Store } from './store'
import { DBCache } from './types'
export { Operation } from 'fast-json-patch'
export class PatchDB<T extends object> {
private readonly cancelStashTimeout = new Subject()
store: Store<T>
private constructor (
private readonly sources: Source<T>[],
private readonly http: Http<T>,
private readonly sequenceStore: SequenceStore<T>,
private readonly timeoutForMissingRevision: number = 5000,
) { }
get store (): Store<T> { return this.sequenceStore.store }
static async init<T extends object> (conf: PatchDbConfig<T>): Promise<PatchDB<T>> {
console.log('PATCHDB - init(): ', conf)
const { sources, http, bootstrapper, timeoutForMissingRevision } = conf
let sequence: number = 0
let data: T = { } as T
try {
const cache = await bootstrapper.init()
console.log('bootstrapped: ', cache)
sequence = cache.sequence
data = cache.data
} catch (e) {
// @TODO what to do if bootstrapper fails?
console.error('bootstrapper failed: ', e)
}
const store = new Store(data)
const sequenceStore = new SequenceStore(store, sequence)
// update cache when sequenceStore emits, throttled
sequenceStore.watch$().pipe(debounceTime(500), skip(1)).subscribe(({ data, sequence }) => {
console.log('PATCHDB - update cache(): ', sequence, data)
bootstrapper.update({ sequence, data }).catch(e => {
console.error('Exception in updateCache: ', e)
})
})
return new PatchDB(sources, http, sequenceStore, timeoutForMissingRevision)
constructor (
private readonly source: Source<T>,
readonly cache: DBCache<T>,
) {
this.store = new Store(cache)
}
sync$ (): Observable<void> {
sync$ (): Observable<DBCache<T>> {
console.log('PATCHDB - sync$()')
const sequence$ = this.sequenceStore.watch$().pipe(map(cache => cache.sequence))
const sequence$ = this.store.watchAll$().pipe(map(cache => cache.sequence))
// nested concatMaps, as it is written, ensure sync is not run for update2 until handleSyncResult is complete for update1.
// flat concatMaps would allow many syncs to run while handleSyncResult was hanging. We can consider such an idea if performance requires it.
return merge(...this.sources.map(s => s.watch$(sequence$))).pipe(
return merge(this.source.watch$(sequence$)).pipe(
tap(update => console.log('PATCHDB - source updated:', update)),
concatMap(update =>
this.sequenceStore.update$(update).pipe(
concatMap(res => this.handleSyncResult$(res)),
),
),
concatMap(update => this.store.update$(update)),
finalize(() => {
console.log('FINALIZING')
this.sequenceStore.sequence = 0
}),
)
}
private handleSyncResult$ (res: Result): Observable<void> {
console.log('PATCHDB - handleSyncResult$(): ', res)
switch (res) {
case Result.DUMPED: return of(this.cancelStashTimeout.next('')) // cancel stash timeout
case Result.REVISED: return of(this.cancelStashTimeout.next('')) // cancel stash timeout
case Result.STASHED: return this.handleStashTimeout$() // call error after timeout
case Result.ERROR: return this.handlePatchError$() // call error immediately
default: return EMPTY
}
}
private handleStashTimeout$ (): Observable<void> {
console.log('PATCHDB - handleStashTimeout$()')
return timer(this.timeoutForMissingRevision).pipe(
tap(time => console.log('PATCHDB - timeout for missing patch:', time)),
takeUntil(this.cancelStashTimeout),
take(1),
concatMap(() => this.handlePatchError$()),
)
}
// Here flattened concatMaps are functionally equivalent to nested because the source observable emits at most once.
private handlePatchError$ (): Observable<void> {
return from(this.http.getDump()).pipe(
concatMap(dump => this.sequenceStore.update$(dump)),
// note the above is a "dump" update, which will always return DUMPED (it can't error)
// handleSyncResult will therefore never re-call handlePatchError()
concatMap(res => this.handleSyncResult$(res)),
catchError(e => {
console.error(e)
return EMPTY
console.log('PATCHDB - FINALIZING sync$()')
this.store.reset()
}),
)
}
}
export type PatchDbConfig<T> = {
http: Http<T>
sources: Source<T>[]
bootstrapper: Bootstrapper<T>
timeoutForMissingRevision?: number
}
export enum PatchOp {
ADD = 'add',
REMOVE = 'remove',
REPLACE = 'replace',
}
export interface Http<T> {
getRevisions (since: number): Promise<Revision[] | Dump<T>>
getDump (): Promise<Dump<T>>
}
export interface Bootstrapper<T> {
init (): Promise<DBCache<T>>
update (cache: DBCache<T>): Promise<void>
}
export interface DBCache<T>{
sequence: number,
data: T
}

View File

@@ -1,211 +0,0 @@
import { BehaviorSubject, Observable } from 'rxjs'
import { filter } from 'rxjs/operators'
import { Store } from './store'
import { DBCache } from './patch-db'
import { patchDocument } from './store'
import { ActionSerializer } from './action-serializer'
import { Operation } from 'fast-json-patch'
import BTree from 'sorted-btree'
export class SequenceStore<T extends object> {
private readonly lastState$: BehaviorSubject<DBCache<T>> = new BehaviorSubject(undefined as any)
private readonly actionSerializer = new ActionSerializer()
private preTemps: T
private stashed = new BTree<number, Revision>()
private temps: UpdateTemp[] = []
private sequence$ = new BehaviorSubject(0)
constructor (
readonly store: Store<T>,
initialSequence: number,
) {
const data = store.peek
this.preTemps = data
this.commit({ data, sequence: initialSequence }, [])
}
get sequence (): number { return this.sequence$.getValue() }
set sequence (seq: number) { this.sequence$.next(seq) }
// subscribe to watch$ to get sequence + T feed, e.g. for caching and bootstrapping from a cache
watch$ (): Observable<DBCache<T>> {
return this.lastState$.pipe(filter(a => !!a))
}
update$ (update: Update<T>): Observable<Result> {
return this.actionSerializer.run$(() => {
if (isTemp(update)) {
return this.updateTemp(update)
} else {
return this.updateReal(update)
}
})
}
viewRevisions (): Revision[] {
// return this.revisions.filter(a => !!a)
return this.stashed.valuesArray()
}
private updateReal (update: UpdateReal<T>): Result {
if (update.expireId) { this.temps = this.temps.filter(temp => temp.expiredBy !== update.expireId) }
if (update.id <= this.sequence) return Result.NOOP
const { result, dbCache, revisionsToDelete } = isDump(update) ?
this.dump(update) :
this.revise(update)
this.preTemps = dbCache.data
const afterTemps = this.stageSeqTemps(dbCache)
this.commit(afterTemps, revisionsToDelete)
return result
}
private updateTemp (update: UpdateTemp): Result {
this.temps.push(update)
const data = patchDocument(update.patch, this.store.peek)
const res = {
data,
sequence: this.sequence,
}
this.commit(res, [])
return Result.TEMP
}
private commit (res: DBCache<T>, revisionsToDelete: number[]): void {
const { data, sequence } = res
this.stashed.deleteKeys(revisionsToDelete)
this.sequence$.next(sequence)
this.store.set(data)
this.lastState$.next({ data, sequence })
}
private dump (dump: Dump<T>): { result: Result, dbCache: DBCache<T>, revisionsToDelete: number[] } {
try {
const oldRevisions = this.stashed.filter((key, _) => key < dump.id).keysArray()
const { dbCache, revisionsToDelete } = this.processRevisions(dump.value, dump.id)
return {
result: Result.DUMPED,
dbCache,
revisionsToDelete: oldRevisions.concat(revisionsToDelete),
}
} catch (e) {
console.error(`Dump error for ${JSON.stringify(dump)}: `, e)
return {
result: Result.ERROR,
dbCache: {
data: this.preTemps,
sequence: this.sequence,
},
revisionsToDelete: [],
}
}
}
private revise (revision: Revision): { result: Result, dbCache: DBCache<T>, revisionsToDelete: number[] } {
this.stashed.set(revision.id, revision)
try {
return this.processRevisions(this.preTemps, this.sequence)
} catch (e) {
console.error(`Revise error for ${JSON.stringify(revision)}: `, e)
return {
result: Result.ERROR,
dbCache: {
data: this.preTemps,
sequence: this.sequence,
},
revisionsToDelete: [],
}
}
}
private stageSeqTemps<S extends DBCache<T>> (resultSoFar: S): S {
return this.temps.reduce(({ data, ...rest }, nextTemp ) => {
try {
const nextContents = patchDocument(nextTemp.patch, data)
return { data: nextContents, ...rest } as S
} catch (e) {
console.error(`Skipping temporary patch ${JSON.stringify(nextTemp)} due to exception: `, e)
return { data, ...rest } as S
}
}, resultSoFar)
}
private processRevisions (data: T, sequence: number): { result: Result, dbCache: DBCache<T>, revisionsToDelete: number[] } {
const applicableRevisions = this.applicableRevisions(sequence)
console.log('APPLICABLE: ', applicableRevisions)
if (!applicableRevisions.length) {
return {
result: Result.STASHED,
dbCache: {
data,
sequence,
},
revisionsToDelete: [],
}
}
const revisionsToDelete: number[] = []
const toReturn = applicableRevisions.reduce(({ data, sequence }, revision) => {
const nextContents = patchDocument(revision.patch, data)
const nextSequence = sequence + 1
revisionsToDelete.push(revision.id) // @TODO original was `revisionsToDelete.concat([seqPatch.id])`, why?
return { data: nextContents, sequence: nextSequence }
}, { data, sequence })
return {
result: Result.REVISED,
dbCache: toReturn,
revisionsToDelete,
}
}
private applicableRevisions (sequence: number): Revision[] {
const toReturn = [] as Revision[]
let i = sequence
while (true) {
i++
const next = this.stashed.get(i)
if (next) {
toReturn.push(next)
} else {
break
}
}
return toReturn
}
}
export enum Result {
DUMPED = 'DUMPED', // store was dumped/replaced
REVISED = 'REVISED', // store was revised
TEMP = 'TEMP', // store was revised temporarily
STASHED = 'STASHED', // attempted to revise store but sequence too high. revision stashed for later
ERROR = 'ERROR', // attempted to revise/dump store, but failed
NOOP = 'NOOP', // sequence too low, update ignored
}
// revise a collection of nodes.
export type Revision = { id: number, patch: Operation[], expireId: string | null }
// dump/replace the entire store with T
export type Dump<T> = { id: number, value: T, expireId: string | null }
export type Update<T> = UpdateReal<T> | UpdateTemp
export type UpdateReal<T> = Revision | Dump<T>
export type UpdateTemp = Omit<Revision, 'id' | 'expireId'> & { expiredBy : string }
function isTemp<T> (s: Update<T>): s is UpdateTemp {
return !!(s as any).expiredBy
}
function isRevision<T> (s: Update<T>): s is Revision {
return !isTemp(s) && !!(s as any).patch
}
function isDump<T> (s: UpdateReal<T>): s is Dump<T> {
return !isTemp(s) && !!(s as any).value
}

View File

@@ -1,7 +1,6 @@
import { BehaviorSubject, concat, from, Observable, of } from 'rxjs'
import { catchError, concatMap, delay, skip, switchMap, take, tap } from 'rxjs/operators'
import { Http } from '../patch-db'
import { UpdateReal } from '../sequence-store'
import { Http, Update } from '../types'
import { Source } from './source'
export type PollConfig = {
@@ -15,7 +14,7 @@ export class PollSource<T> implements Source<T> {
private readonly http: Http<T>,
) { }
watch$ (sequence$: Observable<number>): Observable<UpdateReal<T>> {
watch$ (sequence$: Observable<number>): Observable<Update<T>> {
console.log('POLL_SOURCE - watch$()')
const polling$ = new BehaviorSubject('')

View File

@@ -1,5 +1,5 @@
import { Observable } from 'rxjs'
import { Update } from '../sequence-store'
import { Update } from '../types'
export interface Source<T> {
watch$ (sequence$?: Observable<number>): Observable<Update<T>>

View File

@@ -1,21 +1,27 @@
import { Observable } from 'rxjs'
import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket'
import { UpdateReal } from '../sequence-store'
import { Update } from '../types'
import { Source } from './source'
export class WebsocketSource<T> implements Source<T> {
private websocket$: WebSocketSubject<UpdateReal<T>>
private websocket$: WebSocketSubject<Update<T>>
constructor (
readonly url: string,
) {
const fullConfig: WebSocketSubjectConfig<UpdateReal<T>> = {
const fullConfig: WebSocketSubjectConfig<Update<T>> = {
url,
openObserver: {
next: () => console.log('WebSocket connection open'),
next: () => {
console.log('WebSocket connection open')
this.websocket$.next('open message' as any)
},
},
closeObserver: {
next: () => console.log('WebSocket connection closed'),
next: () => {
console.log('WebSocket connection closed')
// @TODO re-open websocket on retry loop
},
},
closingObserver: {
next: () => console.log('Websocket subscription cancelled, websocket closing'),
@@ -24,5 +30,5 @@ export class WebsocketSource<T> implements Source<T> {
this.websocket$ = webSocket(fullConfig)
}
watch$ (): Observable<UpdateReal<T>> { return this.websocket$.asObservable() }
watch$ (): Observable<Update<T>> { return this.websocket$.asObservable() }
}

View File

@@ -1,17 +1,17 @@
import { from, Observable } from 'rxjs'
import { applyPatch, Operation } from 'fast-json-patch'
import { observable, runInAction } from 'mobx'
import { from, Observable, of } from 'rxjs'
import { toStream } from 'mobx-utils'
import { DBCache, Dump, Revision, Update } from './types'
import { applyPatch } from 'fast-json-patch'
export class Store<T extends object> {
private o: { data: T }
export class Store<T extends { }> {
cache: DBCache<T>
constructor (data: T) {
this.o = observable({ data })
constructor (
readonly initialCache: DBCache<T>,
) {
this.cache = initialCache
}
get peek (): T { return this.o.data }
watch$ (): Observable<T>
watch$<P1 extends keyof T> (p1: P1): Observable<T[P1]>
watch$<P1 extends keyof T, P2 extends keyof T[P1]> (p1: P1, p2: P2): Observable<T[P1][P2]>
@@ -20,29 +20,38 @@ export class Store<T extends object> {
watch$<P1 extends keyof T, P2 extends keyof T[P1], P3 extends keyof T[P1][P2], P4 extends keyof T[P1][P2][P3], P5 extends keyof T[P1][P2][P3][P4]> (p1: P1, p2: P2, p3: P3, p4: P4, p5: P5): Observable<T[P1][P2][P3][P4][P5]>
watch$<P1 extends keyof T, P2 extends keyof T[P1], P3 extends keyof T[P1][P2], P4 extends keyof T[P1][P2][P3], P5 extends keyof T[P1][P2][P3][P4], P6 extends keyof T[P1][P2][P3][P4][P5]> (p1: P1, p2: P2, p3: P3, p4: P4, p5: P5, p6: P6): Observable<T[P1][P2][P3][P4][P5][P6]>
watch$ (...args: (string | number)[]): Observable<any> {
return from(toStream(() => this.peekAccess(...args), true))
return from(toStream(() => this.peekNode(...args), true))
}
set (data: T): void {
runInAction(() => this.o.data = data)
watchAll$ (): Observable<DBCache<T>> {
return of(this.cache)
}
applyPatchDocument (patch: Operation[]): { oldDocument: T, newDocument: T } {
const oldDocument = this.o.data
const newDocument = patchDocument(patch, oldDocument)
this.set(newDocument)
return { oldDocument, newDocument }
update$ (update: Update<T>): Observable<DBCache<T>> {
console.log('UPDATE:', update)
if ((update as Revision).patch) {
if (this.cache.sequence + 1 !== update.id) throw new Error(`Outdated sequence: current: ${this.cache.sequence}, new: ${update.id}`)
applyPatch(this.cache.data, (update as Revision).patch, true, true)
} else {
this.cache.data = (update as Dump<T>).value
}
this.cache.sequence = update.id
return of(this.cache)
}
private peekAccess (...args: (string | number)[]): any {
reset (): void {
this.cache = {
sequence: 0,
data: { },
}
}
private peekNode (...args: (string | number)[]): any {
try {
return args.reduce((acc, next) => (acc as any)[`${next}`], this.o.data)
return args.reduce((acc, next) => (acc as any)[`${next}`], this.cache.data)
} catch (e) {
return undefined
}
}
}
export function patchDocument<T> (patch: Operation[], doc: T): T {
return applyPatch(doc, patch, true, false).newDocument
}

29
client/lib/types.ts Normal file
View File

@@ -0,0 +1,29 @@
import { Operation } from 'fast-json-patch'
// revise a collection of nodes.
export type Revision = { id: number, patch: Operation[], expireId: string | null }
// dump/replace the entire store with T
export type Dump<T> = { id: number, value: T, expireId: string | null }
export type Update<T> = Revision | Dump<T>
export enum PatchOp {
ADD = 'add',
REMOVE = 'remove',
REPLACE = 'replace',
}
export interface Http<T> {
getRevisions (since: number): Promise<Revision[] | Dump<T>>
getDump (): Promise<Dump<T>>
}
export interface Bootstrapper<T> {
init (): Promise<DBCache<T>>
update (cache: DBCache<T>): Promise<void>
}
export interface DBCache<T>{
sequence: number,
data: T | { }
}