diff --git a/client/lib/patch-db.ts b/client/lib/patch-db.ts index 06d12f1..cc37938 100644 --- a/client/lib/patch-db.ts +++ b/client/lib/patch-db.ts @@ -2,43 +2,35 @@ import { EMPTY, merge, Observable, ReplaySubject, Subject } from 'rxjs' import { catchError, switchMap } from 'rxjs/operators' import { Store } from './store' import { DBCache, Http } from './types' -import { RPCError } from './source/ws-source' import { Source } from './source/source' export class PatchDB { public store: Store = new Store(this.http, this.initialCache) - public connectionError$ = new Subject() - public rpcError$ = new Subject() + public connectionError$ = new Subject() public cache$ = new ReplaySubject>(1) - private sub = this.sources$ - .pipe( - switchMap(sources => - merge(...sources.map(s => s.watch$(this.store))).pipe( - catchError(e => { - this.connectionError$.next(e) - - return EMPTY - }), - ), - ), - ) - .subscribe(res => { - if ('result' in res) { - this.store.update(res.result) - this.cache$.next(this.store.cache) - } else { - this.rpcError$.next(res) - } - }) - constructor( private readonly sources$: Observable[]>, private readonly http: Http, private readonly initialCache: DBCache, ) {} - clean() { - this.sub.unsubscribe() + ngOnInit() { + this.sources$ + .pipe( + switchMap(sources => + merge(...sources.map(s => s.watch$(this.store))).pipe( + catchError(e => { + this.connectionError$.next(e) + + return EMPTY + }), + ), + ), + ) + .subscribe(res => { + this.store.update(res) + this.cache$.next(this.store.cache) + }) } } diff --git a/client/lib/source/mock-source.ts b/client/lib/source/mock-source.ts index 5268d54..7791046 100644 --- a/client/lib/source/mock-source.ts +++ b/client/lib/source/mock-source.ts @@ -1,13 +1,11 @@ import { Observable } from 'rxjs' -import { map } from 'rxjs/operators' import { Update } from '../types' import { Source } from './source' -import { RPCResponse } from './ws-source' export class MockSource implements Source { constructor(private readonly seed: Observable>) {} - watch$(): Observable>> { - return this.seed.pipe(map(result => ({ result, jsonrpc: '2.0' }))) + watch$(): Observable> { + return this.seed } } diff --git a/client/lib/source/poll-source.ts b/client/lib/source/poll-source.ts index 3bf3423..9bab917 100644 --- a/client/lib/source/poll-source.ts +++ b/client/lib/source/poll-source.ts @@ -3,7 +3,6 @@ import { concatMap, map, take } from 'rxjs/operators' import { Store } from '../store' import { Http, Update } from '../types' import { Source } from './source' -import { RPCResponse } from './ws-source' export type PollConfig = { cooldown: number @@ -15,13 +14,12 @@ export class PollSource implements Source { private readonly http: Http, ) {} - watch$({ sequence$ }: Store): Observable>> { + watch$({ sequence$ }: Store): Observable> { return sequence$.pipe( concatMap(seq => this.http.getRevisions(seq)), take(1), - // Revision[] converted it into Observable, Dump into Observable> + // convert Revision[] it into Observable. Convert Dump into Observable> concatMap(res => (Array.isArray(res) ? from(res) : of(res))), - map(result => ({ result, jsonrpc: '2.0' as const })), repeatWhen(() => timer(this.pollConfig.cooldown)), ) } diff --git a/client/lib/source/source.ts b/client/lib/source/source.ts index a02fb31..4e69eb0 100644 --- a/client/lib/source/source.ts +++ b/client/lib/source/source.ts @@ -1,8 +1,7 @@ import { Observable } from 'rxjs' import { Store } from '../store' import { Update } from '../types' -import { RPCResponse } from './ws-source' export interface Source { - watch$(store?: Store): Observable>> + watch$(store?: Store): Observable> } diff --git a/client/lib/source/ws-source.ts b/client/lib/source/ws-source.ts index 5ae027a..1e8d531 100644 --- a/client/lib/source/ws-source.ts +++ b/client/lib/source/ws-source.ts @@ -1,54 +1,12 @@ import { Observable, timeout } from 'rxjs' -import { webSocket, WebSocketSubject } from 'rxjs/webSocket' +import { webSocket } from 'rxjs/webSocket' import { Update } from '../types' import { Source } from './source' export class WebsocketSource implements Source { - constructor( - private readonly url: string, - // TODO: Remove fallback after client app is updated - private readonly document: Document = document, - ) {} + constructor(private readonly url: string) {} - watch$(): Observable>> { - const stream$: WebSocketSubject>> = webSocket({ - url: this.url, - openObserver: { - next: () => stream$.next(this.document.cookie as any), - }, - }) - - return stream$.pipe(timeout({ first: 60000 })) - } -} - -interface RPCBase { - jsonrpc: '2.0' -} - -export interface RPCSuccess extends RPCBase { - result: T -} - -export interface RPCError extends RPCBase { - error: { - code: number // 34 means unauthenticated - message: string - data: { - details: string - } - } -} -export type RPCResponse = RPCSuccess | RPCError - -class RpcError { - code: number - message: string - details: string - - constructor(e: RPCError['error']) { - this.code = e.code - this.message = e.message - this.details = e.data.details + watch$(): Observable> { + return webSocket>(this.url).pipe(timeout({ first: 60000 })) } }