diff --git a/client/index.ts b/client/index.ts index 582ffe4..f2de622 100644 --- a/client/index.ts +++ b/client/index.ts @@ -1,7 +1,3 @@ -export * from './lib/source/mock-source' -export * from './lib/source/poll-source' -export * from './lib/source/ws-source' -export * from './lib/source/source' export * from './lib/json-patch-lib' export * from './lib/patch-db' export * from './lib/store' diff --git a/client/lib/patch-db.ts b/client/lib/patch-db.ts index 06d12f1..3347a1f 100644 --- a/client/lib/patch-db.ts +++ b/client/lib/patch-db.ts @@ -1,44 +1,18 @@ -import { EMPTY, merge, Observable, ReplaySubject, Subject } from 'rxjs' -import { catchError, switchMap } from 'rxjs/operators' +import { map, Observable, shareReplay } from 'rxjs' +import { tap } from 'rxjs/operators' import { Store } from './store' -import { DBCache, Http } from './types' -import { RPCError } from './source/ws-source' -import { Source } from './source/source' +import { DBCache, Update } from './types' export class PatchDB { - public store: Store = new Store(this.http, this.initialCache) - public connectionError$ = new Subject() - public rpcError$ = new Subject() - public cache$ = new ReplaySubject>(1) - - private sub = this.sources$ - .pipe( - switchMap(sources => - merge(...sources.map(s => s.watch$(this.store))).pipe( - catchError(e => { - this.connectionError$.next(e) - - return EMPTY - }), - ), - ), - ) - .subscribe(res => { - if ('result' in res) { - this.store.update(res.result) - this.cache$.next(this.store.cache) - } else { - this.rpcError$.next(res) - } - }) + public store: Store = new Store(this.initialCache) + public cache$ = this.source$.pipe( + tap(res => this.store.update(res)), + map(_ => this.store.cache), + shareReplay(1), + ) constructor( - private readonly sources$: Observable[]>, - private readonly http: Http, + private readonly source$: Observable>, private readonly initialCache: DBCache, ) {} - - clean() { - this.sub.unsubscribe() - } } diff --git a/client/lib/source/mock-source.ts b/client/lib/source/mock-source.ts deleted file mode 100644 index 5268d54..0000000 --- a/client/lib/source/mock-source.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { Observable } from 'rxjs' -import { map } from 'rxjs/operators' -import { Update } from '../types' -import { Source } from './source' -import { RPCResponse } from './ws-source' - -export class MockSource implements Source { - constructor(private readonly seed: Observable>) {} - - watch$(): Observable>> { - return this.seed.pipe(map(result => ({ result, jsonrpc: '2.0' }))) - } -} diff --git a/client/lib/source/poll-source.ts b/client/lib/source/poll-source.ts deleted file mode 100644 index cda6ac8..0000000 --- a/client/lib/source/poll-source.ts +++ /dev/null @@ -1,55 +0,0 @@ -import { BehaviorSubject, concat, from, Observable, of } from 'rxjs' -import { - concatMap, - delay, - map, - skip, - switchMap, - take, - tap, -} from 'rxjs/operators' -import { Store } from '../store' -import { Http, Update } from '../types' -import { Source } from './source' -import { RPCResponse } from './ws-source' - -export type PollConfig = { - cooldown: number -} - -export class PollSource implements Source { - constructor( - private readonly pollConfig: PollConfig, - private readonly http: Http, - ) {} - - watch$(store: Store): Observable>> { - const polling$ = new BehaviorSubject('') - - const updates$ = of({}).pipe( - concatMap(_ => store.sequence$), - concatMap(seq => this.http.getRevisions(seq)), - take(1), - ) - - const delay$ = of([]).pipe( - delay(this.pollConfig.cooldown), - tap(_ => polling$.next('')), - skip(1), - ) - - const poll$ = concat(updates$, delay$) - - return polling$.pipe( - switchMap(_ => poll$), - concatMap(res => { - if (Array.isArray(res)) { - return from(res) // takes Revision[] and converts it into Observable - } else { - return of(res) // takes Dump and converts it into Observable> - } - }), - map(result => ({ result, jsonrpc: '2.0' })), - ) - } -} diff --git a/client/lib/source/source.ts b/client/lib/source/source.ts deleted file mode 100644 index a02fb31..0000000 --- a/client/lib/source/source.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { Observable } from 'rxjs' -import { Store } from '../store' -import { Update } from '../types' -import { RPCResponse } from './ws-source' - -export interface Source { - watch$(store?: Store): Observable>> -} diff --git a/client/lib/source/ws-source.ts b/client/lib/source/ws-source.ts deleted file mode 100644 index 4b20f77..0000000 --- a/client/lib/source/ws-source.ts +++ /dev/null @@ -1,58 +0,0 @@ -import { Observable } from 'rxjs' -import { - webSocket, - WebSocketSubject, - WebSocketSubjectConfig, -} from 'rxjs/webSocket' -import { Update } from '../types' -import { Source } from './source' - -export class WebsocketSource implements Source { - private websocket$: WebSocketSubject>> | undefined - - constructor(private readonly url: string) {} - - watch$(): Observable>> { - const fullConfig: WebSocketSubjectConfig>> = { - url: this.url, - openObserver: { - next: () => { - this.websocket$!.next(document.cookie as any) - }, - }, - } - this.websocket$ = webSocket(fullConfig) - return this.websocket$ - } -} - -interface RPCBase { - jsonrpc: '2.0' -} - -export interface RPCSuccess extends RPCBase { - result: T -} - -export interface RPCError extends RPCBase { - error: { - code: number // 34 means unauthenticated - message: string - data: { - details: string - } - } -} -export type RPCResponse = RPCSuccess | RPCError - -class RpcError { - code: number - message: string - details: string - - constructor(e: RPCError['error']) { - this.code = e.code - this.message = e.message - this.details = e.data.details - } -} diff --git a/client/lib/store.ts b/client/lib/store.ts index c0e2cc3..677128b 100644 --- a/client/lib/store.ts +++ b/client/lib/store.ts @@ -1,5 +1,5 @@ -import { DBCache, Dump, Http, Revision, Update } from './types' -import { BehaviorSubject, Observable, ReplaySubject } from 'rxjs' +import { DBCache, Dump, Revision, Update } from './types' +import { BehaviorSubject, Observable } from 'rxjs' import { applyOperation, getValueByPointer, Operation } from './json-patch-lib' import BTree from 'sorted-btree' @@ -9,18 +9,11 @@ export interface StashEntry { } export class Store { - cache: DBCache - sequence$: BehaviorSubject - private watchedNodes: { [path: string]: ReplaySubject } = {} + readonly sequence$ = new BehaviorSubject(this.cache.sequence) + private watchedNodes: { [path: string]: BehaviorSubject } = {} private stash = new BTree() - constructor( - private readonly http: Http, - private readonly initialCache: DBCache, - ) { - this.cache = this.initialCache - this.sequence$ = new BehaviorSubject(initialCache.sequence) - } + constructor(public cache: DBCache) {} watch$(): Observable watch$(p1: P1): Observable> @@ -98,18 +91,32 @@ export class Store { > watch$(...args: (string | number)[]): Observable { const path = `/${args.join('/')}` - if (!this.watchedNodes[path]) { - this.watchedNodes[path] = new ReplaySubject(1) + + return new Observable(subscriber => { + const value = getValueByPointer(this.cache.data, path) + const source = this.watchedNodes[path] || new BehaviorSubject(value) + const subscription = source.subscribe(subscriber) + + this.watchedNodes[path] = source this.updateValue(path) - } - return this.watchedNodes[path].asObservable() + + return () => { + subscription.unsubscribe() + + if (!source.observed) { + source.complete() + delete this.watchedNodes[path] + } + } + }) } update(update: Update): void { if (this.isRevision(update)) { - // if old or known, return - if (update.id <= this.cache.sequence || this.stash.get(update.id)) return - this.handleRevision(update) + // Handle revision if new and not known + if (update.id > this.cache.sequence && !this.stash.get(update.id)) { + this.handleRevision(update) + } } else { this.handleDump(update) } @@ -133,14 +140,7 @@ export class Store { } private handleRevision(revision: Revision): void { - // stash the revision this.stash.set(revision.id, { revision, undo: [] }) - - // if revision is futuristic, fetch missing revisions - if (revision.id > this.cache.sequence + 1) { - this.http.getRevisions(this.cache.sequence) - } - this.processStashed(revision.id) } @@ -204,14 +204,7 @@ export class Store { } private updateWatchedNodes(revisionPath: string) { - const kill = (path: string) => { - this.watchedNodes[path].complete() - delete this.watchedNodes[path] - } - Object.keys(this.watchedNodes).forEach(path => { - if (this.watchedNodes[path].observers.length === 0) return kill(path) - if (path.includes(revisionPath) || revisionPath.includes(path)) { this.updateValue(path) } diff --git a/client/lib/types.ts b/client/lib/types.ts index 657692c..0a9aa2f 100644 --- a/client/lib/types.ts +++ b/client/lib/types.ts @@ -18,11 +18,6 @@ export enum PatchOp { REPLACE = 'replace', } -export interface Http { - getRevisions(since: number): Promise> - getDump(): Promise> -} - export interface Bootstrapper { init(): Promise> update(cache: DBCache): Promise diff --git a/client/package.json b/client/package.json index 8d86da0..51cbd57 100644 --- a/client/package.json +++ b/client/package.json @@ -15,7 +15,7 @@ "uuid": "8.3.2" }, "peerDependencies": { - "rxjs": ">=6.0.0" + "rxjs": ">=7.0.0" }, "devDependencies": { "@types/node": "16.4.13",