src/StreamWalker.ts
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable @typescript-eslint/explicit-function-return-type */
import {Event} from './event'
import * as eventstoreError from './errors'
/**
* Stream walker
*/
export class StreamWalker {
/** iterable */
protected iterable: AsyncIterableIterator<Event | null>
/**
* Creates an instance of stream walker.
*/
public constructor(iterable: AsyncIterableIterator<Event | null>) {
this.iterable = iterable
}
/** standard async iterable function */
public async *[Symbol.asyncIterator]() {
for await (const value of this.iterable) {
yield value
}
}
/**
* The map() method creates a new iterator with the results of calling a provided function on every element in the calling iterator
*/
public map(fn: Function, thisArg?: Function): StreamWalker {
if (typeof fn !== 'function') {
throw eventstoreError.newImplementationError(fn + 'is not a function')
}
const a = async function*(iterable: AsyncIterable<Event | null>) {
for await (const value of iterable) {
yield await fn.call(thisArg, value)
}
}
return new StreamWalker(a(this.iterable))
}
/**
* The filter() method creates a new iterator with all elements that pass the test implemented by the provided function
*/
public filter(fn: Function, thisArg?: Function): StreamWalker {
if (typeof fn !== 'function') {
throw eventstoreError.newImplementationError(fn + 'is not a function')
}
const iterable = this.iterable
const b = async function*(innerFn: Function, thisInnerArg?: Function) {
for await (const value of iterable) {
if (await innerFn.call(thisInnerArg, value)) {
yield value
}
}
}
return new StreamWalker(b(fn, thisArg))
}
/**
* The forEach() method executes a provided function once for each iterator element
*/
public async forEach(fn: Function, thisArg?: Function, ...args: any[]): Promise<void> {
if (typeof fn !== 'function') {
throw eventstoreError.newImplementationError(fn + 'is not a function')
}
const iterable = this.iterable
for await (const value of iterable) {
await fn.call(thisArg, value, ...args)
}
}
/**
* The reduce() method applies a function against an accumulator and each element in the iterator (from left to right) to reduce it to a single value
*/
public async reduce(
accumulatorFunction: Function,
initialValue: any = null,
thisArg?: Function
): Promise<any> {
if (typeof accumulatorFunction !== 'function') {
throw eventstoreError.newImplementationError(accumulatorFunction + 'is not a function')
}
const iterable = this.iterable
let returnValue = initialValue
for await (const value of iterable) {
returnValue = await accumulatorFunction.call(thisArg, returnValue, value)
}
return returnValue
}
/**
* Converts an iterator to an array.
* The returned array will contain all single elements of iterator
*/
public async toArray(): Promise<(Event | null)[]> {
const iterable = this.iterable
const arrayValue = []
for await (const value of iterable) {
arrayValue.push(value)
}
return arrayValue
}
/**
* The every() method tests whether all elements in the iterator pass the test implemented by the provided function
*/
public async every(fn: Function, thisArg?: Function): Promise<boolean> {
if (typeof fn !== 'function') {
throw eventstoreError.newImplementationError(fn + 'is not a function')
}
const iterable = this.iterable
for await (const value of iterable) {
if ((await fn.call(thisArg, value)) === false) {
return false
}
}
return true
}
}