mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 10:21:53 +00:00
updates to bootstrapper
This commit is contained in:
committed by
Aiden McClelland
parent
9ad59a4d1a
commit
416d622df4
@@ -1,5 +1,5 @@
|
|||||||
import { EMPTY, from, merge, Observable, of, Subject, timer } from 'rxjs'
|
import { EMPTY, from, merge, Observable, of, Subject, timer } from 'rxjs'
|
||||||
import { catchError, concatMap, delay, finalize, map, skip, take, takeUntil, tap, throttleTime } from 'rxjs/operators'
|
import { catchError, concatMap, debounce, debounceTime, delay, finalize, map, skip, take, takeUntil, tap, throttleTime } from 'rxjs/operators'
|
||||||
import { Source } from './source/source'
|
import { Source } from './source/source'
|
||||||
import { Dump, SequenceStore, Result, Revision } from './sequence-store'
|
import { Dump, SequenceStore, Result, Revision } from './sequence-store'
|
||||||
import { Store } from './store'
|
import { Store } from './store'
|
||||||
@@ -23,15 +23,14 @@ export class PatchDB<T extends object> {
|
|||||||
|
|
||||||
let sequence: number = 0
|
let sequence: number = 0
|
||||||
let data: T = { } as T
|
let data: T = { } as T
|
||||||
if (bootstrapper) {
|
try {
|
||||||
try {
|
const cache = await bootstrapper.init()
|
||||||
const cache = await bootstrapper.init()
|
console.log('bootstrapped: ', cache)
|
||||||
console.log('FROM CACHE', cache)
|
sequence = cache.sequence
|
||||||
sequence = cache.sequence
|
data = cache.data
|
||||||
data = cache.data
|
} catch (e) {
|
||||||
} catch (e) {
|
// @TODO what to do if bootstrapper fails?
|
||||||
console.error('bootstrapper failed: ', e)
|
console.error('bootstrapper failed: ', e)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const store = new Store(data)
|
const store = new Store(data)
|
||||||
@@ -39,25 +38,24 @@ export class PatchDB<T extends object> {
|
|||||||
const sequenceStore = new SequenceStore(store, sequence)
|
const sequenceStore = new SequenceStore(store, sequence)
|
||||||
|
|
||||||
// update cache when sequenceStore emits, throttled
|
// update cache when sequenceStore emits, throttled
|
||||||
if (bootstrapper) {
|
sequenceStore.watch$().pipe(debounceTime(500), skip(1)).subscribe(({ data, sequence }) => {
|
||||||
sequenceStore.watch$().pipe(throttleTime(500), delay(500), skip(1)).subscribe(({ data, sequence }) => {
|
console.log('PATCHDB - update cache(): ', sequence, data)
|
||||||
console.log('PATCHDB - update cache(): ', sequence, data)
|
bootstrapper.update({ sequence, data }).catch(e => {
|
||||||
bootstrapper.update({ sequence, data }).catch(e => {
|
console.error('Exception in updateCache: ', e)
|
||||||
console.error('Exception in updateCache: ', e)
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
}
|
})
|
||||||
|
|
||||||
return new PatchDB(sources, http, sequenceStore, timeoutForMissingRevision)
|
return new PatchDB(sources, http, sequenceStore, timeoutForMissingRevision)
|
||||||
}
|
}
|
||||||
|
|
||||||
sync$ (): Observable<void> {
|
sync$ (): Observable<void> {
|
||||||
console.log('PATCHDB - sync$()')
|
console.log('PATCHDB - sync$()')
|
||||||
|
|
||||||
const sequence$ = this.sequenceStore.watch$().pipe(map(cache => cache.sequence))
|
const sequence$ = this.sequenceStore.watch$().pipe(map(cache => cache.sequence))
|
||||||
// nested concatMaps, as it is written, ensure sync is not run for update2 until handleSyncResult is complete for update1.
|
// nested concatMaps, as it is written, ensure sync is not run for update2 until handleSyncResult is complete for update1.
|
||||||
// flat concatMaps would allow many syncs to run while handleSyncResult was hanging. We can consider such an idea if performance requires it.
|
// flat concatMaps would allow many syncs to run while handleSyncResult was hanging. We can consider such an idea if performance requires it.
|
||||||
return merge(...this.sources.map(s => s.watch$(sequence$))).pipe(
|
return merge(...this.sources.map(s => s.watch$(sequence$))).pipe(
|
||||||
tap(update => console.log('PATCHDB - sources updated:', update)),
|
tap(update => console.log('PATCHDB - source updated:', update)),
|
||||||
concatMap(update =>
|
concatMap(update =>
|
||||||
this.sequenceStore.update$(update).pipe(
|
this.sequenceStore.update$(update).pipe(
|
||||||
concatMap(res => this.handleSyncResult$(res)),
|
concatMap(res => this.handleSyncResult$(res)),
|
||||||
@@ -109,7 +107,7 @@ export class PatchDB<T extends object> {
|
|||||||
export type PatchDbConfig<T> = {
|
export type PatchDbConfig<T> = {
|
||||||
http: Http<T>
|
http: Http<T>
|
||||||
sources: Source<T>[]
|
sources: Source<T>[]
|
||||||
bootstrapper?: Bootstrapper<T>
|
bootstrapper: Bootstrapper<T>
|
||||||
timeoutForMissingRevision?: number
|
timeoutForMissingRevision?: number
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,7 +125,6 @@ export interface Http<T> {
|
|||||||
export interface Bootstrapper<T> {
|
export interface Bootstrapper<T> {
|
||||||
init (): Promise<DBCache<T>>
|
init (): Promise<DBCache<T>>
|
||||||
update (cache: DBCache<T>): Promise<void>
|
update (cache: DBCache<T>): Promise<void>
|
||||||
clear (): Promise<void>
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface DBCache<T>{
|
export interface DBCache<T>{
|
||||||
|
|||||||
@@ -28,8 +28,8 @@
|
|||||||
"chai": "^4.2.0",
|
"chai": "^4.2.0",
|
||||||
"chai-string": "^1.5.0",
|
"chai-string": "^1.5.0",
|
||||||
"mocha": "^8.2.1",
|
"mocha": "^8.2.1",
|
||||||
"tslint": "^6.1.0",
|
|
||||||
"ts-node": "^9.1.1",
|
"ts-node": "^9.1.1",
|
||||||
|
"tslint": "^6.1.0",
|
||||||
"typescript": "4.1.5"
|
"typescript": "4.1.5"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,9 +18,4 @@ export class MockBootstrapper<T> implements Bootstrapper<T> {
|
|||||||
this.sequence = cache.sequence
|
this.sequence = cache.sequence
|
||||||
this.data = cache.data
|
this.data = cache.data
|
||||||
}
|
}
|
||||||
|
|
||||||
async clear (): Promise<void> {
|
|
||||||
this.sequence = 0
|
|
||||||
this.data = { } as T
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user