diff --git a/client/lib/patch-db.ts b/client/lib/patch-db.ts index f1b2d90..b30864e 100644 --- a/client/lib/patch-db.ts +++ b/client/lib/patch-db.ts @@ -1,26 +1,46 @@ -import { merge, Observable, of } from 'rxjs' -import { concatMap, finalize, tap } from 'rxjs/operators' -import { Source } from './source/source' +import { merge, Observable, Subject, Subscription } from 'rxjs' import { Store } from './store' import { DBCache, Http } from './types' +import { RPCError } from './source/ws-source' +import { Source } from './source/source' export class PatchDB { - store: Store + public store: Store = new Store(this.http, this.initialCache) + public connectionError$ = new Subject() + public rpcError$ = new Subject() + public cache$ = new Subject>() + + private updatesSub?: Subscription + private sourcesSub = this.sources$.subscribe(sources => { + this.updatesSub = merge(...sources.map(s => s.watch$(this.store))).subscribe({ + next: (res) => { + console.log('PatchDB -> subscribedChanges -> JCWM:', { res }) + if ('result' in res) { + this.store.update(res.result) + this.cache$.next(this.store.cache) + } + else { + this.rpcError$.next(res) + } + }, + error: (e) => { + this.connectionError$.next(e) + }, + }) + }) constructor ( - private readonly sources: Source[], + private readonly sources$: Observable[]>, private readonly http: Http, private readonly initialCache: DBCache, ) { - this.store = new Store(this.http, this.initialCache) + console.log('STARTING PATCH 1') } - sync$ (): Observable> { - return merge(...this.sources.map(s => s.watch$(this.store))) - .pipe( - tap(update => this.store.update(update)), - concatMap(() => of(this.store.cache)), - finalize(() => this.store.reset()), - ) + clean () { + this.sourcesSub.unsubscribe() + if (this.updatesSub) { + this.updatesSub.unsubscribe() + } } } diff --git a/client/lib/source/mock-source.ts b/client/lib/source/mock-source.ts index f67398f..93dd68c 100644 --- a/client/lib/source/mock-source.ts +++ b/client/lib/source/mock-source.ts @@ -1,6 +1,8 @@ import { Observable } from 'rxjs' +import { map } from 'rxjs/operators' import { Update } from '../types' import { Source } from './source' +import { RPCResponse } from './ws-source' export class MockSource implements Source { @@ -8,7 +10,8 @@ export class MockSource implements Source { private readonly seed: Observable>, ) { } - watch$ (): Observable> { - return this.seed - } + watch$ (): Observable>> { + return this.seed.pipe(map(result => ({ result, + jsonrpc: '2.0' }))) + } } diff --git a/client/lib/source/poll-source.ts b/client/lib/source/poll-source.ts index fe8f9b4..41d548a 100644 --- a/client/lib/source/poll-source.ts +++ b/client/lib/source/poll-source.ts @@ -1,8 +1,9 @@ import { BehaviorSubject, concat, from, Observable, of, Subject } from 'rxjs' -import { concatMap, delay, skip, switchMap, take, tap } from 'rxjs/operators' +import { concatMap, delay, map, skip, switchMap, take, tap } from 'rxjs/operators' import { Store } from '../store' import { Http, Update } from '../types' import { Source } from './source' +import { RPCResponse } from './ws-source' export type PollConfig = { cooldown: number @@ -15,7 +16,7 @@ export class PollSource implements Source { private readonly http: Http, ) { } - watch$ (store: Store): Observable> { + watch$ (store: Store): Observable>> { const polling$ = new BehaviorSubject('') const updates$ = of({ }) @@ -42,6 +43,8 @@ export class PollSource implements Source { return of(res) // takes Dump and converts it into Observable> } }), + map(result => ({ result, + jsonrpc: '2.0' })), ) } } diff --git a/client/lib/source/source.ts b/client/lib/source/source.ts index 45cf1b8..f608baa 100644 --- a/client/lib/source/source.ts +++ b/client/lib/source/source.ts @@ -1,7 +1,8 @@ import { Observable } from 'rxjs' import { Store } from '../store' import { Update } from '../types' +import { RPCResponse } from './ws-source' export interface Source { - watch$ (store?: Store): Observable> + watch$ (store?: Store): Observable>> } diff --git a/client/lib/source/ws-source.ts b/client/lib/source/ws-source.ts index 53b757d..fdb73db 100644 --- a/client/lib/source/ws-source.ts +++ b/client/lib/source/ws-source.ts @@ -1,5 +1,4 @@ import { Observable } from 'rxjs' -import { map } from 'rxjs/operators' import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket' import { Update } from '../types' import { Source } from './source' @@ -11,7 +10,7 @@ export class WebsocketSource implements Source { private readonly url: string, ) { } - watch$ (): Observable> { + watch$ (): Observable>> { const fullConfig: WebSocketSubjectConfig>> = { url: this.url, openObserver: { @@ -21,12 +20,7 @@ export class WebsocketSource implements Source { }, } this.websocket$ = webSocket(fullConfig) - return this.websocket$.pipe( - map(res => { - if (isRpcSuccess(res)) return res.result - if (isRpcError(res)) throw new RpcError(res.error) - }), - ) as Observable> + return this.websocket$ } } @@ -47,7 +41,6 @@ export interface RPCError extends RPCBase { } } } - export type RPCResponse = RPCSuccess | RPCError function isRpcError (arg: { error: Error } | { result: Result}): arg is { error: Error } {