From eb4a24e797ea68e1c702f92554b078cea80909a7 Mon Sep 17 00:00:00 2001 From: Matt Hill Date: Thu, 1 Jul 2021 16:18:26 -0600 Subject: [PATCH] add connection monitoring --- client/lib/patch-db.ts | 5 +++-- client/lib/source/poll-source.ts | 11 ++++------- client/lib/source/source.ts | 5 +++-- client/lib/source/ws-source.ts | 26 ++++++++++++++++---------- client/lib/types.ts | 6 ++++++ 5 files changed, 32 insertions(+), 21 deletions(-) diff --git a/client/lib/patch-db.ts b/client/lib/patch-db.ts index 95f4c7c..c597ffa 100644 --- a/client/lib/patch-db.ts +++ b/client/lib/patch-db.ts @@ -1,4 +1,4 @@ -import { merge, Observable, of } from 'rxjs' +import { Observable } from 'rxjs' import { concatMap, finalize, map, tap } from 'rxjs/operators' import { Source } from './source/source' import { Store } from './store' @@ -7,6 +7,7 @@ export { Operation } from 'fast-json-patch' export class PatchDB { store: Store + connectionStatus$ = this.source.connectionStatus$ constructor ( private readonly source: Source, @@ -21,7 +22,7 @@ export class PatchDB { const sequence$ = this.store.watchAll$().pipe(map(cache => cache.sequence)) // nested concatMaps, as it is written, ensure sync is not run for update2 until handleSyncResult is complete for update1. // flat concatMaps would allow many syncs to run while handleSyncResult was hanging. We can consider such an idea if performance requires it. - return merge(this.source.watch$(sequence$)).pipe( + return this.source.watch$(sequence$).pipe( tap(update => console.log('PATCHDB - source updated:', update)), concatMap(update => this.store.update$(update)), finalize(() => { diff --git a/client/lib/source/poll-source.ts b/client/lib/source/poll-source.ts index 468706e..ae693e6 100644 --- a/client/lib/source/poll-source.ts +++ b/client/lib/source/poll-source.ts @@ -1,6 +1,6 @@ import { BehaviorSubject, concat, from, Observable, of } from 'rxjs' import { catchError, concatMap, delay, skip, switchMap, take, tap } from 'rxjs/operators' -import { Http, Update } from '../types' +import { ConnectionStatus, Http, Update } from '../types' import { Source } from './source' export type PollConfig = { @@ -8,6 +8,7 @@ export type PollConfig = { } export class PollSource implements Source { + connectionStatus$ = new BehaviorSubject(ConnectionStatus.Initializing) constructor ( private readonly pollConfig: PollConfig, @@ -15,26 +16,22 @@ export class PollSource implements Source { ) { } watch$ (sequence$: Observable): Observable> { - console.log('POLL_SOURCE - watch$()') - const polling$ = new BehaviorSubject('') const updates$ = of('').pipe( concatMap(_ => sequence$), take(1), - tap(_ => console.log('making request')), concatMap(seq => this.http.getRevisions(seq)), + tap(_ => this.connectionStatus$.next(ConnectionStatus.Connected)), catchError(e => { console.error(e) + this.connectionStatus$.next(ConnectionStatus.Disconnected) return of([]) }), - tap(_ => console.log('request complete')), ) const delay$ = of([]).pipe( - tap(_ => console.log('starting cooldown')), delay(this.pollConfig.cooldown), - tap(_ => console.log('cooldown finished')), tap(_ => polling$.next('')), skip(1), ) diff --git a/client/lib/source/source.ts b/client/lib/source/source.ts index ae4cccd..e834267 100644 --- a/client/lib/source/source.ts +++ b/client/lib/source/source.ts @@ -1,6 +1,7 @@ -import { Observable } from 'rxjs' -import { Update } from '../types' +import { BehaviorSubject, Observable } from 'rxjs' +import { ConnectionStatus, Update } from '../types' export interface Source { + connectionStatus$: BehaviorSubject watch$ (sequence$?: Observable): Observable> } diff --git a/client/lib/source/ws-source.ts b/client/lib/source/ws-source.ts index f7d5796..b68565c 100644 --- a/client/lib/source/ws-source.ts +++ b/client/lib/source/ws-source.ts @@ -1,34 +1,40 @@ -import { Observable } from 'rxjs' +import { BehaviorSubject, Observable } from 'rxjs' import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket' -import { Update } from '../types' +import { ConnectionStatus, Update } from '../types' import { Source } from './source' export class WebsocketSource implements Source { - private websocket$: WebSocketSubject> + connectionStatus$ = new BehaviorSubject(ConnectionStatus.Initializing) + private websocket$: WebSocketSubject> | undefined constructor ( - readonly url: string, + private readonly url: string, ) { + } + + watch$ (): Observable> { const fullConfig: WebSocketSubjectConfig> = { - url, + url: this.url, openObserver: { next: () => { console.log('WebSocket connection open') - this.websocket$.next('open message' as any) + this.connectionStatus$.next(ConnectionStatus.Connected) + this.websocket$!.next('open message' as any) }, }, closeObserver: { next: () => { + this.connectionStatus$.next(ConnectionStatus.Disconnected) console.log('WebSocket connection closed') - // @TODO re-open websocket on retry loop }, }, closingObserver: { - next: () => console.log('Websocket subscription cancelled, websocket closing'), + next: () => { + console.log('Websocket subscription cancelled, websocket closing') + }, }, } this.websocket$ = webSocket(fullConfig) + return this.websocket$ } - - watch$ (): Observable> { return this.websocket$.asObservable() } } diff --git a/client/lib/types.ts b/client/lib/types.ts index 4979f7d..a3155c5 100644 --- a/client/lib/types.ts +++ b/client/lib/types.ts @@ -26,4 +26,10 @@ export interface Bootstrapper { export interface DBCache{ sequence: number, data: T | { } +} + +export enum ConnectionStatus { + Initializing = 'initializing', + Connected = 'connected', + Disconnected = 'disconnected', } \ No newline at end of file