diff --git a/client/lib/source/ws-source.ts b/client/lib/source/ws-source.ts index fedb739..cd1a4f7 100644 --- a/client/lib/source/ws-source.ts +++ b/client/lib/source/ws-source.ts @@ -1,18 +1,9 @@ -import { Observable } from 'rxjs' -import { webSocket } from 'rxjs/webSocket' +import { Observable, timeout } from 'rxjs' +import { webSocket, WebSocketSubject } from 'rxjs/webSocket' import { Update } from '../types' import { Source } from './source' export class WebsocketSource implements Source { - private websocket$ = webSocket>>({ - url: this.url, - openObserver: { - next: () => { - this.websocket$.next(this.document.cookie as any) - }, - }, - }) - constructor( private readonly url: string, // TODO: Remove fallback after client app is updated @@ -20,7 +11,14 @@ export class WebsocketSource implements Source { ) {} watch$(): Observable>> { - return this.websocket$ + const stream$: WebSocketSubject>> = webSocket({ + url: this.url, + openObserver: { + next: () => stream$.next(this.document.cookie as any), + }, + }) + + return stream$.pipe(timeout(60000)) } } diff --git a/client/lib/store.ts b/client/lib/store.ts index 65409a5..c24cee5 100644 --- a/client/lib/store.ts +++ b/client/lib/store.ts @@ -100,9 +100,12 @@ export class Store { update(update: Update): void { if (this.isRevision(update)) { - // if old or known, return - if (update.id <= this.cache.sequence || this.stash.get(update.id)) return - this.handleRevision(update) + // if old or known, re-trigger the latest revision + if (update.id <= this.cache.sequence || this.stash.get(update.id)) { + this.updateSequence(this.cache.sequence) + } else { + this.handleRevision(update) + } } else { this.handleDump(update) }