add connection monitoring

This commit is contained in:
Matt Hill
2021-07-01 16:18:26 -06:00
committed by Aiden McClelland
parent 4fedd74b26
commit eb4a24e797
5 changed files with 32 additions and 21 deletions

View File

@@ -1,4 +1,4 @@
import { merge, Observable, of } from 'rxjs' import { Observable } from 'rxjs'
import { concatMap, finalize, map, tap } from 'rxjs/operators' import { concatMap, finalize, map, tap } from 'rxjs/operators'
import { Source } from './source/source' import { Source } from './source/source'
import { Store } from './store' import { Store } from './store'
@@ -7,6 +7,7 @@ export { Operation } from 'fast-json-patch'
export class PatchDB<T extends object> { export class PatchDB<T extends object> {
store: Store<T> store: Store<T>
connectionStatus$ = this.source.connectionStatus$
constructor ( constructor (
private readonly source: Source<T>, private readonly source: Source<T>,
@@ -21,7 +22,7 @@ export class PatchDB<T extends object> {
const sequence$ = this.store.watchAll$().pipe(map(cache => cache.sequence)) const sequence$ = this.store.watchAll$().pipe(map(cache => cache.sequence))
// nested concatMaps, as it is written, ensure sync is not run for update2 until handleSyncResult is complete for update1. // nested concatMaps, as it is written, ensure sync is not run for update2 until handleSyncResult is complete for update1.
// flat concatMaps would allow many syncs to run while handleSyncResult was hanging. We can consider such an idea if performance requires it. // flat concatMaps would allow many syncs to run while handleSyncResult was hanging. We can consider such an idea if performance requires it.
return merge(this.source.watch$(sequence$)).pipe( return this.source.watch$(sequence$).pipe(
tap(update => console.log('PATCHDB - source updated:', update)), tap(update => console.log('PATCHDB - source updated:', update)),
concatMap(update => this.store.update$(update)), concatMap(update => this.store.update$(update)),
finalize(() => { finalize(() => {

View File

@@ -1,6 +1,6 @@
import { BehaviorSubject, concat, from, Observable, of } from 'rxjs' import { BehaviorSubject, concat, from, Observable, of } from 'rxjs'
import { catchError, concatMap, delay, skip, switchMap, take, tap } from 'rxjs/operators' import { catchError, concatMap, delay, skip, switchMap, take, tap } from 'rxjs/operators'
import { Http, Update } from '../types' import { ConnectionStatus, Http, Update } from '../types'
import { Source } from './source' import { Source } from './source'
export type PollConfig = { export type PollConfig = {
@@ -8,6 +8,7 @@ export type PollConfig = {
} }
export class PollSource<T> implements Source<T> { export class PollSource<T> implements Source<T> {
connectionStatus$ = new BehaviorSubject(ConnectionStatus.Initializing)
constructor ( constructor (
private readonly pollConfig: PollConfig, private readonly pollConfig: PollConfig,
@@ -15,26 +16,22 @@ export class PollSource<T> implements Source<T> {
) { } ) { }
watch$ (sequence$: Observable<number>): Observable<Update<T>> { watch$ (sequence$: Observable<number>): Observable<Update<T>> {
console.log('POLL_SOURCE - watch$()')
const polling$ = new BehaviorSubject('') const polling$ = new BehaviorSubject('')
const updates$ = of('').pipe( const updates$ = of('').pipe(
concatMap(_ => sequence$), concatMap(_ => sequence$),
take(1), take(1),
tap(_ => console.log('making request')),
concatMap(seq => this.http.getRevisions(seq)), concatMap(seq => this.http.getRevisions(seq)),
tap(_ => this.connectionStatus$.next(ConnectionStatus.Connected)),
catchError(e => { catchError(e => {
console.error(e) console.error(e)
this.connectionStatus$.next(ConnectionStatus.Disconnected)
return of([]) return of([])
}), }),
tap(_ => console.log('request complete')),
) )
const delay$ = of([]).pipe( const delay$ = of([]).pipe(
tap(_ => console.log('starting cooldown')),
delay(this.pollConfig.cooldown), delay(this.pollConfig.cooldown),
tap(_ => console.log('cooldown finished')),
tap(_ => polling$.next('')), tap(_ => polling$.next('')),
skip(1), skip(1),
) )

View File

@@ -1,6 +1,7 @@
import { Observable } from 'rxjs' import { BehaviorSubject, Observable } from 'rxjs'
import { Update } from '../types' import { ConnectionStatus, Update } from '../types'
export interface Source<T> { export interface Source<T> {
connectionStatus$: BehaviorSubject<ConnectionStatus>
watch$ (sequence$?: Observable<number>): Observable<Update<T>> watch$ (sequence$?: Observable<number>): Observable<Update<T>>
} }

View File

@@ -1,34 +1,40 @@
import { Observable } from 'rxjs' import { BehaviorSubject, Observable } from 'rxjs'
import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket' import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket'
import { Update } from '../types' import { ConnectionStatus, Update } from '../types'
import { Source } from './source' import { Source } from './source'
export class WebsocketSource<T> implements Source<T> { export class WebsocketSource<T> implements Source<T> {
private websocket$: WebSocketSubject<Update<T>> connectionStatus$ = new BehaviorSubject(ConnectionStatus.Initializing)
private websocket$: WebSocketSubject<Update<T>> | undefined
constructor ( constructor (
readonly url: string, private readonly url: string,
) { ) {
}
watch$ (): Observable<Update<T>> {
const fullConfig: WebSocketSubjectConfig<Update<T>> = { const fullConfig: WebSocketSubjectConfig<Update<T>> = {
url, url: this.url,
openObserver: { openObserver: {
next: () => { next: () => {
console.log('WebSocket connection open') console.log('WebSocket connection open')
this.websocket$.next('open message' as any) this.connectionStatus$.next(ConnectionStatus.Connected)
this.websocket$!.next('open message' as any)
}, },
}, },
closeObserver: { closeObserver: {
next: () => { next: () => {
this.connectionStatus$.next(ConnectionStatus.Disconnected)
console.log('WebSocket connection closed') console.log('WebSocket connection closed')
// @TODO re-open websocket on retry loop
}, },
}, },
closingObserver: { closingObserver: {
next: () => console.log('Websocket subscription cancelled, websocket closing'), next: () => {
console.log('Websocket subscription cancelled, websocket closing')
},
}, },
} }
this.websocket$ = webSocket(fullConfig) this.websocket$ = webSocket(fullConfig)
return this.websocket$
} }
watch$ (): Observable<Update<T>> { return this.websocket$.asObservable() }
} }

View File

@@ -27,3 +27,9 @@ export interface DBCache<T>{
sequence: number, sequence: number,
data: T | { } data: T | { }
} }
export enum ConnectionStatus {
Initializing = 'initializing',
Connected = 'connected',
Disconnected = 'disconnected',
}