mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-04-02 05:03:08 +00:00
chore: add timeout and sequence retrigger
This commit is contained in:
@@ -1,18 +1,9 @@
|
|||||||
import { Observable } from 'rxjs'
|
import { Observable, timeout } from 'rxjs'
|
||||||
import { webSocket } from 'rxjs/webSocket'
|
import { webSocket, WebSocketSubject } from 'rxjs/webSocket'
|
||||||
import { Update } from '../types'
|
import { Update } from '../types'
|
||||||
import { Source } from './source'
|
import { Source } from './source'
|
||||||
|
|
||||||
export class WebsocketSource<T> implements Source<T> {
|
export class WebsocketSource<T> implements Source<T> {
|
||||||
private websocket$ = webSocket<RPCResponse<Update<T>>>({
|
|
||||||
url: this.url,
|
|
||||||
openObserver: {
|
|
||||||
next: () => {
|
|
||||||
this.websocket$.next(this.document.cookie as any)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly url: string,
|
private readonly url: string,
|
||||||
// TODO: Remove fallback after client app is updated
|
// TODO: Remove fallback after client app is updated
|
||||||
@@ -20,7 +11,14 @@ export class WebsocketSource<T> implements Source<T> {
|
|||||||
) {}
|
) {}
|
||||||
|
|
||||||
watch$(): Observable<RPCResponse<Update<T>>> {
|
watch$(): Observable<RPCResponse<Update<T>>> {
|
||||||
return this.websocket$
|
const stream$: WebSocketSubject<RPCResponse<Update<T>>> = webSocket({
|
||||||
|
url: this.url,
|
||||||
|
openObserver: {
|
||||||
|
next: () => stream$.next(this.document.cookie as any),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
return stream$.pipe(timeout(60000))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -100,9 +100,12 @@ export class Store<T extends { [key: string]: any }> {
|
|||||||
|
|
||||||
update(update: Update<T>): void {
|
update(update: Update<T>): void {
|
||||||
if (this.isRevision(update)) {
|
if (this.isRevision(update)) {
|
||||||
// if old or known, return
|
// if old or known, re-trigger the latest revision
|
||||||
if (update.id <= this.cache.sequence || this.stash.get(update.id)) return
|
if (update.id <= this.cache.sequence || this.stash.get(update.id)) {
|
||||||
this.handleRevision(update)
|
this.updateSequence(this.cache.sequence)
|
||||||
|
} else {
|
||||||
|
this.handleRevision(update)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
this.handleDump(update)
|
this.handleDump(update)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user