diff --git a/src/flatscan.ts b/src/flatscan.ts index 401d34aeb..7c39da294 100644 --- a/src/flatscan.ts +++ b/src/flatscan.ts @@ -14,7 +14,7 @@ export function flatScanSeedless(src: Observable, f: Function2(src: Observable, seed: Out, f: Accumulator): Property { return src.scan(seed, f) .last() .withDesc(new Desc(src, "fold", [seed, f])); +} + +/** @hidden */ +export function foldSeedless(src: Observable, f: Accumulator): Property { + return src.scan(f) + .last() + .withDesc(new Desc(src, "fold", [f])); } \ No newline at end of file diff --git a/src/observable.ts b/src/observable.ts index effc7a0b8..400c5cba3 100644 --- a/src/observable.ts +++ b/src/observable.ts @@ -20,7 +20,7 @@ import doLogT from "./dolog"; import doErrorT from "./doerror"; import doActionT from "./doaction"; import doEndT from "./doend"; -import { Accumulator, default as scan } from "./scan"; +import { Accumulator, scanSeedless, default as scan } from "./scan"; import mapEndT from "./mapend"; import mapErrorT from "./maperror"; import { SpawnerOrObservable, EventSpawner, EventOrValue } from "./flatmap_"; @@ -40,7 +40,7 @@ import { filter } from "./filter"; import { and, not, or } from "./boolean"; import flatMapFirst from "./flatmapfirst"; import addPropertyInitValueToStream from "./internal/addpropertyinitialvaluetostream"; -import fold from "./fold"; +import { default as fold, foldSeedless } from "./fold"; import { startWithE, startWithP } from "./startwith"; import takeUntil from "./takeuntil"; import flatMap from "./flatmap"; @@ -407,8 +407,16 @@ Works like [`scan`](#scan) but only emits the final value, i.e. the value just before the observable ends. Returns a [`Property`](property.html). */ - fold(seed: V2, f: Accumulator): Property { - return fold(this, seed, f) + + fold(seed: V2, f: Accumulator): Property + + fold(f: Accumulator): Property + + fold(seed: V2 | Accumulator, f?: Accumulator): Property { + if (arguments.length === 1) { + return foldSeedless(this, seed as any as Accumulator); + } + return fold(this, seed as any as V2, f as any as Accumulator) } /** @@ -596,8 +604,15 @@ Only applicable for observables with arrays as values. } /** A synonym for [scan](#scan). */ - reduce(seed: V2, f: Accumulator): Property { - return fold(this, seed, f) + reduce(seed: V2, f: Accumulator): Property + + reduce(f: Accumulator): Property + + reduce(seed: V2 | Accumulator, f?: Accumulator): Property { + if (arguments.length === 1) { + return foldSeedless(this, seed as any as Accumulator); + } + return fold(this, seed as any as V2, f as any as Accumulator) } /** @@ -639,8 +654,16 @@ identically to EventStream.scan: the `seed` will be the initial value of seed won't be output as is. Instead, the initial value of `r` will be `f(seed, x)`. This makes sense, because there can only be 1 initial value for a Property at a time. */ - scan(seed: V2, f: Accumulator): Property { - return scan(this, seed, f) + + scan(seed: V2, f: Accumulator): Property + + scan(f: Accumulator): Property + + scan(seed: V2 | Accumulator, f?: Accumulator): Property { + if (arguments.length === 1) { + return scanSeedless(this, seed as any as Accumulator); + } + return scan(this, seed as any as V2, f as any as Accumulator) } /** Skips the first n elements from the stream diff --git a/src/scan.ts b/src/scan.ts index 63bd39f30..657421291 100644 --- a/src/scan.ts +++ b/src/scan.ts @@ -1,5 +1,5 @@ import Observable from "./observable"; -import { Property } from "./observable";; +import { Property } from "./observable"; import { Event, hasValue, Initial } from "./event"; import { more, noMore } from "./reply"; import { nop } from "./helpers"; @@ -58,3 +58,29 @@ export default function scan(src: Observable, seed: Out, f: Accumul } return resultProperty = new Property(new Desc(src, "scan", [seed, f]), subscribe) } + +/** @hidden */ +export function scanSeedless(src: Observable, f: Accumulator): Property { + let acc: V; + let hasAccumulatedFirstValue: Boolean = false; + const subscribe: Subscribe = (sink: EventSink) => { + let unsub = src.subscribeInternal(function(event: Event) { + if (hasValue(event)) { + //console.log("has value: ", hasValue(event), "isInitial:", event.isInitial); + if (!hasAccumulatedFirstValue) { + acc = event.value; + hasAccumulatedFirstValue = true; + return sink(event); // let the initial event pass through + } + + acc = f(acc, event.value); + return sink(event.apply(acc)); + + } else { + return sink(event); + } + }); + return unsub; + } + return new Property(new Desc(src, "scan", [f]), subscribe) +} diff --git a/test/flatscan.ts b/test/flatscan.ts index 7b42965f2..a83243658 100644 --- a/test/flatscan.ts +++ b/test/flatscan.ts @@ -10,12 +10,6 @@ describe("EventStream.flatScan", function() { [0, 1, 3, error(), 6]) ); - describe("Without a seed value", () => - expectPropertyEvents( - () => series(1, [1, 2, error(), 3]).flatScan(addAsync(1)), - [1, 3, error(), 6]) - ); - describe("Serializes updates even when they occur while performing previous update", () => expectPropertyEvents( () => series(1, [1, 2, error(), 3]).flatScan(0, addAsync(5)), @@ -34,6 +28,20 @@ describe("EventStream.flatScan", function() { [0, 1, 3, error(), 6], semiunstable) ); + describe("Without a seed value", () => { + it ("accumulates values with given seed and accumulator function which returns a stream of updated values", () => + expectPropertyEvents( + () => series(1, [1, 2, error(), 3]).flatScan(addAsync(1)), + [1, 3, error(), 6] + ) + ); + it("Serializes updates even when they occur while performing previous update", () => + expectPropertyEvents( + () => series(1, [0, 1, 2, error(), 3]).flatScan(addAsync(5)), + [0, error(), 1, 3, 6], semiunstable) + ); + }); + return it("yields the seed value immediately", function() { const outputs: number[] = []; new Bacon.Bus().flatScan(0, (a, b) => 1).onValue(value => { outputs.push(value) }); diff --git a/test/fold.ts b/test/fold.ts index b6f2c2dcc..57f955ca1 100644 --- a/test/fold.ts +++ b/test/fold.ts @@ -8,7 +8,7 @@ describe("EventStream.fold", function() { ); describe("has reduce as synonym", () => expectPropertyEvents( - () => series(1, [1, 2, error(), 3]).fold(0, add), + () => series(1, [1, 2, error(), 3]).reduce(0, add), [error(), 6]) ); describe("works with synchronous source", () => @@ -16,6 +16,31 @@ describe("EventStream.fold", function() { () => fromArray([1, 2, error(), 3]).fold(0, add), [error(), 6], unstable) ); + + describe("Without seed value", function(){ + it("folds stream into a single-valued Property, passes through errors", () => + expectPropertyEvents( + () => series(1, [0, 1, 2, error(), 3]).fold(add), + [error(), 6]) + ); + it("has reduce as synonym", () => + expectPropertyEvents( + () => series(1, [1, 2, error(), 3]).reduce(add), + [error(), 6]) + ); + it("works with synchronous source", () => + expectPropertyEvents( + () => fromArray([0, 1, 2, error(), 3]).fold(add), + [error(), 6], unstable) + ); + it("works with really large chunks too, with { eager: true }", function() { + const count = 50000; + return expectPropertyEvents( + () => series(1, range(1, count, true)).fold((x: number,y: number) => x+1), + [count]); + }); + }); + return describe("works with really large chunks too, with { eager: true }", function() { const count = 50000; return expectPropertyEvents( @@ -24,10 +49,15 @@ describe("EventStream.fold", function() { }); }); -describe("Property.fold", () => +describe("Property.fold", () => { describe("Folds Property into a single-valued one", () => expectPropertyEvents( - () => series(1, [2,3]).toProperty(1).fold(0, add), + () => series(1, [2, 3]).toProperty(1).fold(0, add), + [6]) + ); + describe("Without seed value folds Property into a single-valued one", () => + expectPropertyEvents( + () => series(1, [2, 3]).toProperty(1).fold(add), [6]) ) -); \ No newline at end of file +}); \ No newline at end of file diff --git a/test/scan.ts b/test/scan.ts index 5fb056b7f..d88e50636 100644 --- a/test/scan.ts +++ b/test/scan.ts @@ -59,6 +59,69 @@ describe("EventStream.scan", function() { expect(count).to.equal(1); }); }); + + describe("Without a seed value", () => { + it("accumulates values and lets errors pass", () => + expectPropertyEvents( + () => series(1, [1, 2, 3, error(), 4]).scan(add), + [1, 3, 6, error(), 10], + unstable + ) + ); + it("yields null seed value", () => + expectPropertyEvents( + () => series(1, [null, 1]).scan(() => 1), + [null, 1], unstable) + ); + it("works with synchronous streams", () => + expectPropertyEvents( + () => fromArray([0, 1, 2, 3]).scan((x, y) => x + y), + [0, 1, 3, 6], unstable) + ); + it("works with merged synchronous streams", () => + expectPropertyEvents( + () => Bacon.mergeAll(once(0), once(1), once(2)).scan((a, b) => a + b), + [0, 1, 3], unstable) + ); + it("works with functions as values", () => + expectPropertyEvents( + () => series(1, [(() => 0), (() => 1), (() => 2)]).scan((a, b) => b).map(f => f()), + [0, 1, 2], unstable) + ); + describe("calls accumulator function once per value", function () { + describe("(simple case)", function () { + let count = 0; + expectPropertyEvents( + () => series(2, [0, 1, 2, 3]).scan(function (x, y) { + count++; + return x + y; + }), + [0, 1, 3, 6], + { + extraCheck() { + return it("calls accumulator once per value", () => expect(count).to.equal(3)); + }, + unstable + } + ); + }); + it("(when pushing to Bus in accumulator)", function () { + let count = 0; + const someBus = new Bacon.Bus(); + someBus.onValue(function () {}); + const src = new Bacon.Bus(); + const result = src.scan(function (_, __) { + someBus.push(null); + return count++; + }); + result.onValue(); + result.onValue(); + src.push(0); + src.push(1); + expect(count).to.equal(1); + }); + }); + }); }); describe("Property.scan", function() { @@ -97,4 +160,51 @@ describe("Property.scan", function() { [1]) ); }); + describe("without Seed value", function() { + it("with Init value, starts with init, f(init, xs[0])", () => + expectPropertyEvents( + () => series(1, [1,2,3]).toProperty(0).scan(add), + [0, 1, 3, 6], + unstable + ) + ); + it("without Init value, starts with seed", () => + expectPropertyEvents( + () => series(1, [0, 2,3]).toProperty().scan(add), + [0, 2, 5], + unstable + ) + ); + it("treats null init value like any other value", function() { + expectPropertyEvents( + () => series(1, [null as any, 1]).toProperty().scan(add), + [null, 1], + unstable + ); + expectPropertyEvents( + () => series(1, [null as any, 2]).toProperty(1).scan(add), + [1, 1, 3], + unstable + ); + }); + describe("for synchronous source", function() { + it("with Init value, starts with f(seed, init)", () => + expectPropertyEvents( + () => fromArray([0,2,3]).toProperty(1).scan(add), + [1, 1, 3, 6], unstable) + ); + it("without Init value, starts with seed", () => + expectPropertyEvents( + () => fromArray([0,2,3]).toProperty().scan(add), + [0, 2, 5], unstable) + ); + it("works with synchronously responding empty source", () => + expectPropertyEvents( + () => Bacon.never().toProperty(1).scan(add), + [1], + unstable + ) + ); + }); + }); }); diff --git a/types/fold.d.ts b/types/fold.d.ts index 2f3055a5c..dd7329f41 100644 --- a/types/fold.d.ts +++ b/types/fold.d.ts @@ -7,3 +7,5 @@ import { Accumulator } from "./scan"; import { Property } from "./observable"; /** @hidden */ export default function fold(src: Observable, seed: Out, f: Accumulator): Property; +/** @hidden */ +export declare function foldSeedless(src: Observable, f: Accumulator): Property; diff --git a/types/observable.d.ts b/types/observable.d.ts index 9504d59cb..84599fee0 100644 --- a/types/observable.d.ts +++ b/types/observable.d.ts @@ -293,6 +293,7 @@ export declare abstract class Observable { [`Property`](property.html). */ fold(seed: V2, f: Accumulator): Property; + fold(f: Accumulator): Property; /** An alias for [onValue](#onvalue). @@ -439,6 +440,7 @@ export declare abstract class Observable { /** A synonym for [scan](#scan). */ reduce(seed: V2, f: Accumulator): Property; + reduce(f: Accumulator): Property; /** Creates an EventStream by sampling this stream/property value at each event from the `sampler` stream. The result @@ -479,6 +481,7 @@ export declare abstract class Observable { because there can only be 1 initial value for a Property at a time. */ scan(seed: V2, f: Accumulator): Property; + scan(f: Accumulator): Property; /** Skips the first n elements from the stream */ diff --git a/types/scan.d.ts b/types/scan.d.ts index 629e1abda..d622f708a 100644 --- a/types/scan.d.ts +++ b/types/scan.d.ts @@ -3,3 +3,5 @@ import { Property } from "./observable"; export declare type Accumulator = (acc: Out, value: In) => Out; /** @hidden */ export default function scan(src: Observable, seed: Out, f: Accumulator): Property; +/** @hidden */ +export declare function scanSeedless(src: Observable, f: Accumulator): Property;