From 434c6f70378e0b4d9356f5800d93ab0766c4746b Mon Sep 17 00:00:00 2001 From: Matt Hill Date: Tue, 6 Jul 2021 16:56:15 -0600 Subject: [PATCH] start --- client/lib/patch-db.ts | 10 ++++---- client/lib/source/poll-source.ts | 14 ++++-------- client/lib/store.ts | 39 ++++++++++++-------------------- client/lib/types.ts | 2 +- 4 files changed, 25 insertions(+), 40 deletions(-) diff --git a/client/lib/patch-db.ts b/client/lib/patch-db.ts index 4206d84..5eccd34 100644 --- a/client/lib/patch-db.ts +++ b/client/lib/patch-db.ts @@ -1,4 +1,4 @@ -import { merge, Observable } from 'rxjs' +import { merge, Observable, of } from 'rxjs' import { concatMap, finalize, tap } from 'rxjs/operators' import { Source } from './source/source' import { Store } from './store' @@ -6,21 +6,21 @@ import { DBCache } from './types' export { Operation } from 'fast-json-patch' -export class PatchDB { +export class PatchDB { store: Store constructor ( private readonly sources: Source[], - readonly cache: DBCache, + private readonly initialCache: DBCache, ) { - this.store = new Store(cache) + this.store = new Store(this.initialCache) } sync$ (): Observable> { return merge(...this.sources.map(s => s.watch$(this.store))) .pipe( tap(update => this.store.update(update)), - concatMap(() => this.store.watchCache$()), + concatMap(() => of(this.store.cache)), finalize(() => { this.store.reset() }), diff --git a/client/lib/source/poll-source.ts b/client/lib/source/poll-source.ts index 0679e12..d209dac 100644 --- a/client/lib/source/poll-source.ts +++ b/client/lib/source/poll-source.ts @@ -1,5 +1,5 @@ import { BehaviorSubject, concat, from, Observable, of } from 'rxjs' -import { concatMap, delay, map, skip, switchMap, take, tap } from 'rxjs/operators' +import { concatMap, delay, skip, switchMap, take, tap } from 'rxjs/operators' import { Store } from '../store' import { Http, Update } from '../types' import { Source } from './source' @@ -16,17 +16,13 @@ export class PollSource implements Source { ) { } watch$ (store: Store): Observable> { - const sequence$ = store.watchCache$() - .pipe( - map(cache => cache.sequence), - ) - const polling$ = new BehaviorSubject('') - const updates$ = of('').pipe( - concatMap(_ => sequence$), - take(1), + const updates$ = of({ }) + .pipe( + concatMap(_ => store.sequence$), concatMap(seq => this.http.getRevisions(seq)), + take(1), ) const delay$ = of([]).pipe( diff --git a/client/lib/store.ts b/client/lib/store.ts index 5fc78db..61dd9e9 100644 --- a/client/lib/store.ts +++ b/client/lib/store.ts @@ -1,20 +1,17 @@ -import { BehaviorSubject, from, Observable } from 'rxjs' -import { observable } from 'mobx' -import { toStream } from 'mobx-utils' import { DBCache, Dump, Revision, Update } from './types' import { applyPatch } from 'fast-json-patch' +import { BehaviorSubject, from, Observable } from 'rxjs' +import { toStream } from 'mobx-utils' export class Store { - sequence: number - o: { data: T | { } } - cache$: BehaviorSubject> + cache: DBCache + sequence$: BehaviorSubject constructor ( readonly initialCache: DBCache, ) { - this.sequence = initialCache.sequence - this.o = observable({ data: this.initialCache.data }) - this.cache$ = new BehaviorSubject(initialCache) + this.cache = initialCache + this.sequence$ = new BehaviorSubject(initialCache.sequence) } watch$ (): Observable @@ -28,33 +25,25 @@ export class Store { return from(toStream(() => this.peekNode(...args), true)) } - watchCache$ (): Observable> { - return this.cache$.asObservable() - } - update (update: Update): void { if ((update as Revision).patch) { - if (this.sequence + 1 !== update.id) throw new Error(`Outdated sequence: current: ${this.sequence}, new: ${update.id}`) - applyPatch(this.o.data, (update as Revision).patch, true, true) + if (this.cache.sequence + 1 !== update.id) throw new Error(`Outdated sequence: current: ${this.cache.sequence}, new: ${update.id}`) + applyPatch(this.cache.data, (update as Revision).patch, true, true) } else { - this.o.data = (update as Dump).value + this.cache.data = (update as Dump).value } - - this.sequence = update.id - - this.cache$.next({ sequence: this.sequence, data: this.o.data }) + this.cache.sequence = update.id + this.sequence$.next(this.cache.sequence) } reset (): void { - this.cache$.next({ - sequence: 0, - data: { }, - }) + this.cache.sequence = 0 + this.cache.data = { } as T } private peekNode (...args: (string | number)[]): any { try { - return args.reduce((acc, next) => (acc as any)[`${next}`], this.o.data) + return args.reduce((acc, next) => (acc as any)[`${next}`], this.cache.data) } catch (e) { return undefined } diff --git a/client/lib/types.ts b/client/lib/types.ts index 004a969..537b6b0 100644 --- a/client/lib/types.ts +++ b/client/lib/types.ts @@ -25,5 +25,5 @@ export interface Bootstrapper { export interface DBCache{ sequence: number, - data: T | { } + data: T }