From d84752007dd159cb99d9772f770e77c0510af701 Mon Sep 17 00:00:00 2001 From: Matt Hill Date: Mon, 5 Jul 2021 20:36:44 -0600 Subject: [PATCH] remove connection status from lib --- client/lib/patch-db.ts | 21 +++++++----------- client/lib/source/poll-source.ts | 19 ++++++++-------- client/lib/source/source.ts | 8 +++---- client/lib/source/ws-source.ts | 20 +++-------------- client/lib/store.ts | 37 ++++++++++++++++++-------------- client/lib/types.ts | 6 ------ 6 files changed, 45 insertions(+), 66 deletions(-) diff --git a/client/lib/patch-db.ts b/client/lib/patch-db.ts index c597ffa..4206d84 100644 --- a/client/lib/patch-db.ts +++ b/client/lib/patch-db.ts @@ -1,32 +1,27 @@ -import { Observable } from 'rxjs' -import { concatMap, finalize, map, tap } from 'rxjs/operators' +import { merge, Observable } from 'rxjs' +import { concatMap, finalize, tap } from 'rxjs/operators' import { Source } from './source/source' import { Store } from './store' import { DBCache } from './types' + export { Operation } from 'fast-json-patch' export class PatchDB { store: Store - connectionStatus$ = this.source.connectionStatus$ constructor ( - private readonly source: Source, + private readonly sources: Source[], readonly cache: DBCache, ) { this.store = new Store(cache) } sync$ (): Observable> { - console.log('PATCHDB - sync$()') - - const sequence$ = this.store.watchAll$().pipe(map(cache => cache.sequence)) - // nested concatMaps, as it is written, ensure sync is not run for update2 until handleSyncResult is complete for update1. - // flat concatMaps would allow many syncs to run while handleSyncResult was hanging. We can consider such an idea if performance requires it. - return this.source.watch$(sequence$).pipe( - tap(update => console.log('PATCHDB - source updated:', update)), - concatMap(update => this.store.update$(update)), + return merge(...this.sources.map(s => s.watch$(this.store))) + .pipe( + tap(update => this.store.update(update)), + concatMap(() => this.store.watchCache$()), finalize(() => { - console.log('PATCHDB - FINALIZING sync$()') this.store.reset() }), ) diff --git a/client/lib/source/poll-source.ts b/client/lib/source/poll-source.ts index ae693e6..0679e12 100644 --- a/client/lib/source/poll-source.ts +++ b/client/lib/source/poll-source.ts @@ -1,6 +1,7 @@ import { BehaviorSubject, concat, from, Observable, of } from 'rxjs' -import { catchError, concatMap, delay, skip, switchMap, take, tap } from 'rxjs/operators' -import { ConnectionStatus, Http, Update } from '../types' +import { concatMap, delay, map, skip, switchMap, take, tap } from 'rxjs/operators' +import { Store } from '../store' +import { Http, Update } from '../types' import { Source } from './source' export type PollConfig = { @@ -8,26 +9,24 @@ export type PollConfig = { } export class PollSource implements Source { - connectionStatus$ = new BehaviorSubject(ConnectionStatus.Initializing) constructor ( private readonly pollConfig: PollConfig, private readonly http: Http, ) { } - watch$ (sequence$: Observable): Observable> { + watch$ (store: Store): Observable> { + const sequence$ = store.watchCache$() + .pipe( + map(cache => cache.sequence), + ) + const polling$ = new BehaviorSubject('') const updates$ = of('').pipe( concatMap(_ => sequence$), take(1), concatMap(seq => this.http.getRevisions(seq)), - tap(_ => this.connectionStatus$.next(ConnectionStatus.Connected)), - catchError(e => { - console.error(e) - this.connectionStatus$.next(ConnectionStatus.Disconnected) - return of([]) - }), ) const delay$ = of([]).pipe( diff --git a/client/lib/source/source.ts b/client/lib/source/source.ts index e834267..45cf1b8 100644 --- a/client/lib/source/source.ts +++ b/client/lib/source/source.ts @@ -1,7 +1,7 @@ -import { BehaviorSubject, Observable } from 'rxjs' -import { ConnectionStatus, Update } from '../types' +import { Observable } from 'rxjs' +import { Store } from '../store' +import { Update } from '../types' export interface Source { - connectionStatus$: BehaviorSubject - watch$ (sequence$?: Observable): Observable> + watch$ (store?: Store): Observable> } diff --git a/client/lib/source/ws-source.ts b/client/lib/source/ws-source.ts index b68565c..0fc0fa0 100644 --- a/client/lib/source/ws-source.ts +++ b/client/lib/source/ws-source.ts @@ -1,16 +1,14 @@ -import { BehaviorSubject, Observable } from 'rxjs' +import { Observable } from 'rxjs' import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket' -import { ConnectionStatus, Update } from '../types' +import { Update } from '../types' import { Source } from './source' export class WebsocketSource implements Source { - connectionStatus$ = new BehaviorSubject(ConnectionStatus.Initializing) private websocket$: WebSocketSubject> | undefined constructor ( private readonly url: string, - ) { - } + ) { } watch$ (): Observable> { const fullConfig: WebSocketSubjectConfig> = { @@ -18,21 +16,9 @@ export class WebsocketSource implements Source { openObserver: { next: () => { console.log('WebSocket connection open') - this.connectionStatus$.next(ConnectionStatus.Connected) this.websocket$!.next('open message' as any) }, }, - closeObserver: { - next: () => { - this.connectionStatus$.next(ConnectionStatus.Disconnected) - console.log('WebSocket connection closed') - }, - }, - closingObserver: { - next: () => { - console.log('Websocket subscription cancelled, websocket closing') - }, - }, } this.websocket$ = webSocket(fullConfig) return this.websocket$ diff --git a/client/lib/store.ts b/client/lib/store.ts index b056f8b..5fc78db 100644 --- a/client/lib/store.ts +++ b/client/lib/store.ts @@ -1,15 +1,20 @@ -import { from, Observable, of } from 'rxjs' +import { BehaviorSubject, from, Observable } from 'rxjs' +import { observable } from 'mobx' import { toStream } from 'mobx-utils' import { DBCache, Dump, Revision, Update } from './types' import { applyPatch } from 'fast-json-patch' -export class Store { - cache: DBCache +export class Store { + sequence: number + o: { data: T | { } } + cache$: BehaviorSubject> constructor ( readonly initialCache: DBCache, ) { - this.cache = initialCache + this.sequence = initialCache.sequence + this.o = observable({ data: this.initialCache.data }) + this.cache$ = new BehaviorSubject(initialCache) } watch$ (): Observable @@ -23,33 +28,33 @@ export class Store { return from(toStream(() => this.peekNode(...args), true)) } - watchAll$ (): Observable> { - return of(this.cache) + watchCache$ (): Observable> { + return this.cache$.asObservable() } - update$ (update: Update): Observable> { - console.log('UPDATE:', update) + update (update: Update): void { if ((update as Revision).patch) { - if (this.cache.sequence + 1 !== update.id) throw new Error(`Outdated sequence: current: ${this.cache.sequence}, new: ${update.id}`) - applyPatch(this.cache.data, (update as Revision).patch, true, true) + if (this.sequence + 1 !== update.id) throw new Error(`Outdated sequence: current: ${this.sequence}, new: ${update.id}`) + applyPatch(this.o.data, (update as Revision).patch, true, true) } else { - this.cache.data = (update as Dump).value + this.o.data = (update as Dump).value } - this.cache.sequence = update.id - return of(this.cache) + this.sequence = update.id + + this.cache$.next({ sequence: this.sequence, data: this.o.data }) } reset (): void { - this.cache = { + this.cache$.next({ sequence: 0, data: { }, - } + }) } private peekNode (...args: (string | number)[]): any { try { - return args.reduce((acc, next) => (acc as any)[`${next}`], this.cache.data) + return args.reduce((acc, next) => (acc as any)[`${next}`], this.o.data) } catch (e) { return undefined } diff --git a/client/lib/types.ts b/client/lib/types.ts index a3155c5..004a969 100644 --- a/client/lib/types.ts +++ b/client/lib/types.ts @@ -27,9 +27,3 @@ export interface DBCache{ sequence: number, data: T | { } } - -export enum ConnectionStatus { - Initializing = 'initializing', - Connected = 'connected', - Disconnected = 'disconnected', -} \ No newline at end of file