diff --git a/client/lib/patch-db.ts b/client/lib/patch-db.ts index 179a1a6..cf4ecd1 100644 --- a/client/lib/patch-db.ts +++ b/client/lib/patch-db.ts @@ -1,5 +1,5 @@ import { merge, Observable, of } from 'rxjs' -import { concatMap, finalize, tap } from 'rxjs/operators' +import { concatMap, tap } from 'rxjs/operators' import { Source } from './source/source' import { Store } from './store' import { DBCache, Http } from './types' @@ -22,4 +22,8 @@ export class PatchDB { concatMap(() => of(this.store.cache)), ) } + + connectionMade$ (): Observable { + return merge(...this.sources.map(s => s.connectionMade$)) + } } diff --git a/client/lib/source/poll-source.ts b/client/lib/source/poll-source.ts index d209dac..ba7f7fe 100644 --- a/client/lib/source/poll-source.ts +++ b/client/lib/source/poll-source.ts @@ -1,4 +1,4 @@ -import { BehaviorSubject, concat, from, Observable, of } from 'rxjs' +import { BehaviorSubject, concat, from, Observable, of, Subject } from 'rxjs' import { concatMap, delay, skip, switchMap, take, tap } from 'rxjs/operators' import { Store } from '../store' import { Http, Update } from '../types' @@ -9,6 +9,7 @@ export type PollConfig = { } export class PollSource implements Source { + connectionMade$ = new Subject() constructor ( private readonly pollConfig: PollConfig, @@ -34,14 +35,15 @@ export class PollSource implements Source { const poll$ = concat(updates$, delay$) return polling$.pipe( - switchMap(_ => poll$), - concatMap(res => { + switchMap(_ => poll$), + concatMap(res => { + this.connectionMade$.next() if (Array.isArray(res)) { return from(res) // takes Revision[] and converts it into Observable } else { return of(res) // takes Dump and converts it into Observable> } - }), + }), ) } } diff --git a/client/lib/source/source.ts b/client/lib/source/source.ts index 45cf1b8..0f452e9 100644 --- a/client/lib/source/source.ts +++ b/client/lib/source/source.ts @@ -4,4 +4,5 @@ import { Update } from '../types' export interface Source { watch$ (store?: Store): Observable> + connectionMade$: Observable } diff --git a/client/lib/source/ws-source.ts b/client/lib/source/ws-source.ts index 4a9fe21..7ae4c92 100644 --- a/client/lib/source/ws-source.ts +++ b/client/lib/source/ws-source.ts @@ -1,10 +1,11 @@ -import { Observable } from 'rxjs' +import { Observable, Subject } from 'rxjs' import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket' import { Update } from '../types' import { Source } from './source' export class WebsocketSource implements Source { private websocket$: WebSocketSubject> | undefined + connectionMade$ = new Subject() constructor ( private readonly url: string, @@ -15,6 +16,7 @@ export class WebsocketSource implements Source { url: this.url, openObserver: { next: () => { + this.connectionMade$.next() this.websocket$!.next(document.cookie as any) }, },