Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions packages/aws-cdk-lib/aws-stepfunctions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,19 @@ const map = new sfn.Map(this, 'Map State', {
});
```

When using `JSONata`, you can also specify the `maxConcurrency` dynamically using a JSONata expression with the `jsonataMaxConcurrency` property. This allows you to determine the concurrency limit based on state input or other runtime values:

```ts
const map = new sfn.Map(this, 'Map State', {
jsonataMaxConcurrency: '{% $states.input.maxConcurrency %}',
itemSelector: {
item: '{% $states.context.Map.Item.Value %}',
}
});
```

Note that `jsonataMaxConcurrency` is mutually exclusive with `maxConcurrency` and `maxConcurrencyPath`.

To define a distributed `Map` state set `itemProcessors` mode to `ProcessorMode.DISTRIBUTED`.
An `executionType` must be specified for the distributed `Map` workflow.

Expand Down
31 changes: 31 additions & 0 deletions packages/aws-cdk-lib/aws-stepfunctions/lib/states/map-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ export interface MapBaseOptions extends AssignableStateOptions {
*/
readonly maxConcurrency?: number;

/**
* JSONata expression for MaxConcurrency
*
* A JSONata expression that evaluates to an integer, specifying the maximum
* concurrency dynamically. Mutually exclusive with `maxConcurrency` and
* `maxConcurrencyPath`.
*
* Example value: `{% $states.input.maxConcurrency %}`
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/concepts-asl-use-map-state-inline.html#map-state-inline-additional-fields
*
* @default - full concurrency
*/
readonly jsonataMaxConcurrency?: string;

/**
* The JSON that you want to override your default iteration input (mutually exclusive with `parameters` and `jsonataItemSelector`).
*
Expand Down Expand Up @@ -157,6 +173,7 @@ export abstract class MapBase extends State implements INextable {

private readonly maxConcurrency?: number;
private readonly maxConcurrencyPath?: string;
private readonly jsonataMaxConcurrency?: string;
protected readonly items?: ProvideItems;
protected readonly itemsPath?: string;
protected readonly itemSelector?: { [key: string]: any };
Expand All @@ -167,6 +184,7 @@ export abstract class MapBase extends State implements INextable {
this.endStates = [this];
this.maxConcurrency = props.maxConcurrency;
this.maxConcurrencyPath = props.maxConcurrencyPath;
this.jsonataMaxConcurrency = props.jsonataMaxConcurrency;
this.items = props.items;
this.itemsPath = props.itemsPath;
this.itemSelector = props.itemSelector;
Expand Down Expand Up @@ -200,6 +218,7 @@ export abstract class MapBase extends State implements INextable {
...this.renderItemProcessor(),
...(this.maxConcurrency && { MaxConcurrency: this.maxConcurrency }),
...(this.maxConcurrencyPath && { MaxConcurrencyPath: renderJsonPath(this.maxConcurrencyPath) }),
...(this.jsonataMaxConcurrency && { MaxConcurrency: this.jsonataMaxConcurrency }),
...this.renderAssign(topLevelQueryLanguage),
};
}
Expand All @@ -222,6 +241,18 @@ export abstract class MapBase extends State implements INextable {
errors.push('Provide either `maxConcurrency` or `maxConcurrencyPath`, but not both');
}

if (this.jsonataMaxConcurrency && this.maxConcurrency) {
errors.push('Provide either `maxConcurrency` or `jsonataMaxConcurrency`, but not both');
}

if (this.jsonataMaxConcurrency && this.maxConcurrencyPath) {
errors.push('Provide either `maxConcurrencyPath` or `jsonataMaxConcurrency`, but not both');
}

if (this.jsonataMaxConcurrency && !isValidJsonataExpression(this.jsonataMaxConcurrency)) {
errors.push('The `jsonataMaxConcurrency` property must be a valid JSONata expression');
}

if (this.itemSelector && this.jsonataItemSelector) {
errors.push('Provide either `itemSelector` or `jsonataItemSelector`, but not both');
}
Expand Down
82 changes: 82 additions & 0 deletions packages/aws-cdk-lib/aws-stepfunctions/test/map.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,88 @@ describe('Map State', () => {
items: '{% $items %}',
});
});

test('State Machine With Map State and jsonataMaxConcurrency', () => {
// GIVEN
const stack = new cdk.Stack();

// WHEN
const map = new stepfunctions.Map(stack, 'Map State', {
stateName: 'My-Map-State',
jsonataMaxConcurrency: '{% $states.input.maxConcurrency %}',
itemsPath: stepfunctions.JsonPath.stringAt('$.inputForMap'),
});
map.itemProcessor(new stepfunctions.Pass(stack, 'Pass State'));

// THEN
expect(render(map)).toStrictEqual({
StartAt: 'My-Map-State',
States: {
'My-Map-State': {
Type: 'Map',
End: true,
ItemProcessor: {
ProcessorConfig: {
Mode: 'INLINE',
},
StartAt: 'Pass State',
States: {
'Pass State': {
Type: 'Pass',
End: true,
},
},
},
ItemsPath: '$.inputForMap',
MaxConcurrency: '{% $states.input.maxConcurrency %}',
},
},
});
});

test('fails in synthesis if jsonataMaxConcurrency is not a valid JSONata expression', () => {
const app = createAppWithMap((stack) => {
const map = new stepfunctions.Map(stack, 'Map State', {
jsonataMaxConcurrency: 'Invalid expression',
itemsPath: stepfunctions.JsonPath.stringAt('$.inputForMap'),
});
map.itemProcessor(new stepfunctions.Pass(stack, 'Pass State'));

return map;
});

expect(() => app.synth()).toThrow(/The `jsonataMaxConcurrency` property must be a valid JSONata expression/);
});

test('fails in synthesis when maxConcurrency and jsonataMaxConcurrency are both defined', () => {
const app = createAppWithMap((stack) => {
const map = new stepfunctions.Map(stack, 'Map State', {
maxConcurrency: 10,
jsonataMaxConcurrency: '{% $states.input.maxConcurrency %}',
itemsPath: stepfunctions.JsonPath.stringAt('$.inputForMap'),
});
map.itemProcessor(new stepfunctions.Pass(stack, 'Pass State'));

return map;
});

expect(() => app.synth()).toThrow(/Provide either `maxConcurrency` or `jsonataMaxConcurrency`, but not both/);
});

test('fails in synthesis when maxConcurrencyPath and jsonataMaxConcurrency are both defined', () => {
const app = createAppWithMap((stack) => {
const map = new stepfunctions.Map(stack, 'Map State', {
maxConcurrencyPath: stepfunctions.JsonPath.stringAt('$.maxConcurrencyPath'),
jsonataMaxConcurrency: '{% $states.input.maxConcurrency %}',
itemsPath: stepfunctions.JsonPath.stringAt('$.inputForMap'),
});
map.itemProcessor(new stepfunctions.Pass(stack, 'Pass State'));

return map;
});

expect(() => app.synth()).toThrow(/Provide either `maxConcurrencyPath` or `jsonataMaxConcurrency`, but not both/);
});
});

function render(sm: stepfunctions.IChainable) {
Expand Down
Loading