import { BehaviorSubject, Observable } from 'rxjs' import { filter } from 'rxjs/operators' import { Store } from './store' import { DBCache } from './patch-db' import { patchDocument } from './store' import { ActionSerializer } from './action-serializer' import { Operation } from 'fast-json-patch' import BTree from 'sorted-btree' export class SequenceStore { private readonly lastState$: BehaviorSubject> = new BehaviorSubject(undefined as any) private readonly actionSerializer = new ActionSerializer() private preTemps: T private stashed = new BTree() private temps: UpdateTemp[] = [] private sequence$ = new BehaviorSubject(0) constructor ( readonly store: Store, initialSequence: number, ) { const data = store.peek this.preTemps = data this.commit({ data, sequence: initialSequence }, []) } get sequence (): number { return this.sequence$.getValue() } set sequence (seq: number) { this.sequence$.next(seq) } // subscribe to watch$ to get sequence + T feed, e.g. for caching and bootstrapping from a cache watch$ (): Observable> { return this.lastState$.pipe(filter(a => !!a)) } update$ (update: Update): Observable { return this.actionSerializer.run$(() => { if (isTemp(update)) { return this.updateTemp(update) } else { return this.updateReal(update) } }) } viewRevisions (): Revision[] { // return this.revisions.filter(a => !!a) return this.stashed.valuesArray() } private updateReal (update: UpdateReal): Result { if (update.expireId) { this.temps = this.temps.filter(temp => temp.expiredBy !== update.expireId) } if (update.id <= this.sequence) return Result.NOOP const { result, dbCache, revisionsToDelete } = isDump(update) ? this.dump(update) : this.revise(update) this.preTemps = dbCache.data const afterTemps = this.stageSeqTemps(dbCache) this.commit(afterTemps, revisionsToDelete) return result } private updateTemp (update: UpdateTemp): Result { this.temps.push(update) const data = patchDocument(update.patch, this.store.peek) const res = { data, sequence: this.sequence, } this.commit(res, []) return Result.TEMP } private commit (res: DBCache, revisionsToDelete: number[]): void { const { data, sequence } = res this.stashed.deleteKeys(revisionsToDelete) this.sequence$.next(sequence) this.store.set(data) this.lastState$.next({ data, sequence }) } private dump (dump: Dump): { result: Result, dbCache: DBCache, revisionsToDelete: number[] } { try { const oldRevisions = this.stashed.filter((key, _) => key < dump.id).keysArray() const { dbCache, revisionsToDelete } = this.processRevisions(dump.value, dump.id) return { result: Result.DUMPED, dbCache, revisionsToDelete: oldRevisions.concat(revisionsToDelete), } } catch (e) { console.error(`Dump error for ${JSON.stringify(dump)}: `, e) return { result: Result.ERROR, dbCache: { data: this.preTemps, sequence: this.sequence, }, revisionsToDelete: [], } } } private revise (revision: Revision): { result: Result, dbCache: DBCache, revisionsToDelete: number[] } { this.stashed.set(revision.id, revision) try { return this.processRevisions(this.preTemps, this.sequence) } catch (e) { console.error(`Revise error for ${JSON.stringify(revision)}: `, e) return { result: Result.ERROR, dbCache: { data: this.preTemps, sequence: this.sequence, }, revisionsToDelete: [], } } } private stageSeqTemps> (resultSoFar: S): S { return this.temps.reduce(({ data, ...rest }, nextTemp ) => { try { const nextContents = patchDocument(nextTemp.patch, data) return { data: nextContents, ...rest } as S } catch (e) { console.error(`Skipping temporary patch ${JSON.stringify(nextTemp)} due to exception: `, e) return { data, ...rest } as S } }, resultSoFar) } private processRevisions (data: T, sequence: number): { result: Result, dbCache: DBCache, revisionsToDelete: number[] } { const applicableRevisions = this.applicableRevisions(sequence) console.log('APPLICABLE: ', applicableRevisions) if (!applicableRevisions.length) { return { result: Result.STASHED, dbCache: { data, sequence, }, revisionsToDelete: [], } } const revisionsToDelete: number[] = [] const toReturn = applicableRevisions.reduce(({ data, sequence }, revision) => { const nextContents = patchDocument(revision.patch, data) const nextSequence = sequence + 1 revisionsToDelete.push(revision.id) // @TODO original was `revisionsToDelete.concat([seqPatch.id])`, why? return { data: nextContents, sequence: nextSequence } }, { data, sequence }) return { result: Result.REVISED, dbCache: toReturn, revisionsToDelete, } } private applicableRevisions (sequence: number): Revision[] { const toReturn = [] as Revision[] let i = sequence while (true) { i++ const next = this.stashed.get(i) if (next) { toReturn.push(next) } else { break } } return toReturn } } export enum Result { DUMPED = 'DUMPED', // store was dumped/replaced REVISED = 'REVISED', // store was revised TEMP = 'TEMP', // store was revised temporarily STASHED = 'STASHED', // attempted to revise store but sequence too high. revision stashed for later ERROR = 'ERROR', // attempted to revise/dump store, but failed NOOP = 'NOOP', // sequence too low, update ignored } // revise a collection of nodes. export type Revision = { id: number, patch: Operation[], expireId: string | null } // dump/replace the entire store with T export type Dump = { id: number, value: T, expireId: string | null } export type Update = UpdateReal | UpdateTemp export type UpdateReal = Revision | Dump export type UpdateTemp = Omit & { expiredBy : string } function isTemp (s: Update): s is UpdateTemp { return !!(s as any).expiredBy } function isRevision (s: Update): s is Revision { return !isTemp(s) && !!(s as any).patch } function isDump (s: UpdateReal): s is Dump { return !isTemp(s) && !!(s as any).value }