remove connection status from lib

This commit is contained in:
Matt Hill
2021-07-05 20:36:44 -06:00
committed by Aiden McClelland
parent eb4a24e797
commit d84752007d
6 changed files with 45 additions and 66 deletions

View File

@@ -1,6 +1,7 @@
import { BehaviorSubject, concat, from, Observable, of } from 'rxjs'
import { catchError, concatMap, delay, skip, switchMap, take, tap } from 'rxjs/operators'
import { ConnectionStatus, Http, Update } from '../types'
import { concatMap, delay, map, skip, switchMap, take, tap } from 'rxjs/operators'
import { Store } from '../store'
import { Http, Update } from '../types'
import { Source } from './source'
export type PollConfig = {
@@ -8,26 +9,24 @@ export type PollConfig = {
}
export class PollSource<T> implements Source<T> {
connectionStatus$ = new BehaviorSubject(ConnectionStatus.Initializing)
constructor (
private readonly pollConfig: PollConfig,
private readonly http: Http<T>,
) { }
watch$ (sequence$: Observable<number>): Observable<Update<T>> {
watch$ (store: Store<T>): Observable<Update<T>> {
const sequence$ = store.watchCache$()
.pipe(
map(cache => cache.sequence),
)
const polling$ = new BehaviorSubject('')
const updates$ = of('').pipe(
concatMap(_ => sequence$),
take(1),
concatMap(seq => this.http.getRevisions(seq)),
tap(_ => this.connectionStatus$.next(ConnectionStatus.Connected)),
catchError(e => {
console.error(e)
this.connectionStatus$.next(ConnectionStatus.Disconnected)
return of([])
}),
)
const delay$ = of([]).pipe(

View File

@@ -1,7 +1,7 @@
import { BehaviorSubject, Observable } from 'rxjs'
import { ConnectionStatus, Update } from '../types'
import { Observable } from 'rxjs'
import { Store } from '../store'
import { Update } from '../types'
export interface Source<T> {
connectionStatus$: BehaviorSubject<ConnectionStatus>
watch$ (sequence$?: Observable<number>): Observable<Update<T>>
watch$ (store?: Store<T>): Observable<Update<T>>
}

View File

@@ -1,16 +1,14 @@
import { BehaviorSubject, Observable } from 'rxjs'
import { Observable } from 'rxjs'
import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket'
import { ConnectionStatus, Update } from '../types'
import { Update } from '../types'
import { Source } from './source'
export class WebsocketSource<T> implements Source<T> {
connectionStatus$ = new BehaviorSubject(ConnectionStatus.Initializing)
private websocket$: WebSocketSubject<Update<T>> | undefined
constructor (
private readonly url: string,
) {
}
) { }
watch$ (): Observable<Update<T>> {
const fullConfig: WebSocketSubjectConfig<Update<T>> = {
@@ -18,21 +16,9 @@ export class WebsocketSource<T> implements Source<T> {
openObserver: {
next: () => {
console.log('WebSocket connection open')
this.connectionStatus$.next(ConnectionStatus.Connected)
this.websocket$!.next('open message' as any)
},
},
closeObserver: {
next: () => {
this.connectionStatus$.next(ConnectionStatus.Disconnected)
console.log('WebSocket connection closed')
},
},
closingObserver: {
next: () => {
console.log('Websocket subscription cancelled, websocket closing')
},
},
}
this.websocket$ = webSocket(fullConfig)
return this.websocket$