From 6b16474a2a5d3ad0c2b29b0fc46c54a5bf13e92b Mon Sep 17 00:00:00 2001 From: waterplea Date: Fri, 5 Aug 2022 19:12:11 +0300 Subject: [PATCH] chore: refactor rxjs --- client/lib/source/poll-source.ts | 50 ++++++++++---------------------- client/lib/source/ws-source.ts | 29 +++++++++--------- client/lib/store.ts | 11 ++----- 3 files changed, 31 insertions(+), 59 deletions(-) diff --git a/client/lib/source/poll-source.ts b/client/lib/source/poll-source.ts index cda6ac8..39783b2 100644 --- a/client/lib/source/poll-source.ts +++ b/client/lib/source/poll-source.ts @@ -1,13 +1,14 @@ -import { BehaviorSubject, concat, from, Observable, of } from 'rxjs' import { - concatMap, - delay, - map, - skip, - switchMap, - take, - tap, -} from 'rxjs/operators' + BehaviorSubject, + concat, + from, + ignoreElements, + Observable, + of, + repeatWhen, + timer, +} from 'rxjs' +import { concatMap, map, switchMap, take, tap } from 'rxjs/operators' import { Store } from '../store' import { Http, Update } from '../types' import { Source } from './source' @@ -23,33 +24,14 @@ export class PollSource implements Source { private readonly http: Http, ) {} - watch$(store: Store): Observable>> { - const polling$ = new BehaviorSubject('') - - const updates$ = of({}).pipe( - concatMap(_ => store.sequence$), + watch$({ sequence$ }: Store): Observable>> { + return sequence$.pipe( concatMap(seq => this.http.getRevisions(seq)), take(1), - ) - - const delay$ = of([]).pipe( - delay(this.pollConfig.cooldown), - tap(_ => polling$.next('')), - skip(1), - ) - - const poll$ = concat(updates$, delay$) - - return polling$.pipe( - switchMap(_ => poll$), - concatMap(res => { - if (Array.isArray(res)) { - return from(res) // takes Revision[] and converts it into Observable - } else { - return of(res) // takes Dump and converts it into Observable> - } - }), - map(result => ({ result, jsonrpc: '2.0' })), + // Revision[] converted it into Observable, Dump into Observable> + concatMap(res => (Array.isArray(res) ? from(res) : of(res))), + map(result => ({ result, jsonrpc: '2.0' as const })), + repeatWhen(() => timer(this.pollConfig.cooldown)), ) } } diff --git a/client/lib/source/ws-source.ts b/client/lib/source/ws-source.ts index 4b20f77..3470cd0 100644 --- a/client/lib/source/ws-source.ts +++ b/client/lib/source/ws-source.ts @@ -1,27 +1,24 @@ import { Observable } from 'rxjs' -import { - webSocket, - WebSocketSubject, - WebSocketSubjectConfig, -} from 'rxjs/webSocket' +import { webSocket, WebSocketSubject } from 'rxjs/webSocket' import { Update } from '../types' import { Source } from './source' export class WebsocketSource implements Source { - private websocket$: WebSocketSubject>> | undefined + private websocket$: WebSocketSubject>> = webSocket({ + url: this.url, + openObserver: { + next: () => { + this.websocket$.next(this.document.cookie as any) + }, + }, + }) - constructor(private readonly url: string) {} + constructor( + private readonly url: string, + private readonly document: Document, + ) {} watch$(): Observable>> { - const fullConfig: WebSocketSubjectConfig>> = { - url: this.url, - openObserver: { - next: () => { - this.websocket$!.next(document.cookie as any) - }, - }, - } - this.websocket$ = webSocket(fullConfig) return this.websocket$ } } diff --git a/client/lib/store.ts b/client/lib/store.ts index c0e2cc3..65409a5 100644 --- a/client/lib/store.ts +++ b/client/lib/store.ts @@ -9,18 +9,11 @@ export interface StashEntry { } export class Store { - cache: DBCache - sequence$: BehaviorSubject + readonly sequence$ = new BehaviorSubject(this.cache.sequence) private watchedNodes: { [path: string]: ReplaySubject } = {} private stash = new BTree() - constructor( - private readonly http: Http, - private readonly initialCache: DBCache, - ) { - this.cache = this.initialCache - this.sequence$ = new BehaviorSubject(initialCache.sequence) - } + constructor(private readonly http: Http, public cache: DBCache) {} watch$(): Observable watch$(p1: P1): Observable>