add connection monitoring

This commit is contained in:
Matt Hill
2021-07-01 16:18:26 -06:00
committed by Aiden McClelland
parent 06d8a6f76f
commit 80698e8228
5 changed files with 32 additions and 21 deletions

View File

@@ -1,4 +1,4 @@
import { merge, Observable, of } from 'rxjs'
import { Observable } from 'rxjs'
import { concatMap, finalize, map, tap } from 'rxjs/operators'
import { Source } from './source/source'
import { Store } from './store'
@@ -7,6 +7,7 @@ export { Operation } from 'fast-json-patch'
export class PatchDB<T extends object> {
store: Store<T>
connectionStatus$ = this.source.connectionStatus$
constructor (
private readonly source: Source<T>,
@@ -21,7 +22,7 @@ export class PatchDB<T extends object> {
const sequence$ = this.store.watchAll$().pipe(map(cache => cache.sequence))
// nested concatMaps, as it is written, ensure sync is not run for update2 until handleSyncResult is complete for update1.
// flat concatMaps would allow many syncs to run while handleSyncResult was hanging. We can consider such an idea if performance requires it.
return merge(this.source.watch$(sequence$)).pipe(
return this.source.watch$(sequence$).pipe(
tap(update => console.log('PATCHDB - source updated:', update)),
concatMap(update => this.store.update$(update)),
finalize(() => {

View File

@@ -1,6 +1,6 @@
import { BehaviorSubject, concat, from, Observable, of } from 'rxjs'
import { catchError, concatMap, delay, skip, switchMap, take, tap } from 'rxjs/operators'
import { Http, Update } from '../types'
import { ConnectionStatus, Http, Update } from '../types'
import { Source } from './source'
export type PollConfig = {
@@ -8,6 +8,7 @@ export type PollConfig = {
}
export class PollSource<T> implements Source<T> {
connectionStatus$ = new BehaviorSubject(ConnectionStatus.Initializing)
constructor (
private readonly pollConfig: PollConfig,
@@ -15,26 +16,22 @@ export class PollSource<T> implements Source<T> {
) { }
watch$ (sequence$: Observable<number>): Observable<Update<T>> {
console.log('POLL_SOURCE - watch$()')
const polling$ = new BehaviorSubject('')
const updates$ = of('').pipe(
concatMap(_ => sequence$),
take(1),
tap(_ => console.log('making request')),
concatMap(seq => this.http.getRevisions(seq)),
tap(_ => this.connectionStatus$.next(ConnectionStatus.Connected)),
catchError(e => {
console.error(e)
this.connectionStatus$.next(ConnectionStatus.Disconnected)
return of([])
}),
tap(_ => console.log('request complete')),
)
const delay$ = of([]).pipe(
tap(_ => console.log('starting cooldown')),
delay(this.pollConfig.cooldown),
tap(_ => console.log('cooldown finished')),
tap(_ => polling$.next('')),
skip(1),
)

View File

@@ -1,6 +1,7 @@
import { Observable } from 'rxjs'
import { Update } from '../types'
import { BehaviorSubject, Observable } from 'rxjs'
import { ConnectionStatus, Update } from '../types'
export interface Source<T> {
connectionStatus$: BehaviorSubject<ConnectionStatus>
watch$ (sequence$?: Observable<number>): Observable<Update<T>>
}

View File

@@ -1,34 +1,40 @@
import { Observable } from 'rxjs'
import { BehaviorSubject, Observable } from 'rxjs'
import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket'
import { Update } from '../types'
import { ConnectionStatus, Update } from '../types'
import { Source } from './source'
export class WebsocketSource<T> implements Source<T> {
private websocket$: WebSocketSubject<Update<T>>
connectionStatus$ = new BehaviorSubject(ConnectionStatus.Initializing)
private websocket$: WebSocketSubject<Update<T>> | undefined
constructor (
readonly url: string,
private readonly url: string,
) {
}
watch$ (): Observable<Update<T>> {
const fullConfig: WebSocketSubjectConfig<Update<T>> = {
url,
url: this.url,
openObserver: {
next: () => {
console.log('WebSocket connection open')
this.websocket$.next('open message' as any)
this.connectionStatus$.next(ConnectionStatus.Connected)
this.websocket$!.next('open message' as any)
},
},
closeObserver: {
next: () => {
this.connectionStatus$.next(ConnectionStatus.Disconnected)
console.log('WebSocket connection closed')
// @TODO re-open websocket on retry loop
},
},
closingObserver: {
next: () => console.log('Websocket subscription cancelled, websocket closing'),
next: () => {
console.log('Websocket subscription cancelled, websocket closing')
},
},
}
this.websocket$ = webSocket(fullConfig)
return this.websocket$
}
watch$ (): Observable<Update<T>> { return this.websocket$.asObservable() }
}

View File

@@ -26,4 +26,10 @@ export interface Bootstrapper<T> {
export interface DBCache<T>{
sequence: number,
data: T | { }
}
export enum ConnectionStatus {
Initializing = 'initializing',
Connected = 'connected',
Disconnected = 'disconnected',
}