Skip to content

Commit 059e6cc

Browse files
feat: support status change notifications
Closes #197
1 parent 1251944 commit 059e6cc

File tree

8 files changed

+624
-6
lines changed

8 files changed

+624
-6
lines changed
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
'use strict';
2+
const _ = require('lodash');
3+
const Joi = require('@hapi/joi');
4+
const Chance = require('chance');
5+
const BbPromise = require('bluebird');
6+
const schema = require('./compileNotifications.schema');
7+
8+
const chance = new Chance();
9+
10+
const executionStatuses = [
11+
'ABORTED', 'FAILED', 'RUNNING', 'SUCCEEDED', 'TIMED_OUT',
12+
];
13+
14+
function randomTargetId(stateMachineName, status) {
15+
const suffix = chance.string({
16+
length: 5,
17+
pool: 'abcdefghijklmnopqrstufwxyzABCDEFGHIJKLMNOPQRSTUFWXYZ1234567890',
18+
});
19+
20+
return `${stateMachineName}-${status}-${suffix}`;
21+
}
22+
23+
function compileTarget(stateMachineName, status, targetObj) {
24+
if (targetObj.sns) {
25+
return {
26+
Arn: targetObj.sns,
27+
Id: randomTargetId(stateMachineName, status),
28+
};
29+
} else if (targetObj.sqs && _.isString(targetObj.sqs)) {
30+
return {
31+
Arn: targetObj.sqs,
32+
Id: randomTargetId(stateMachineName, status),
33+
};
34+
} else if (targetObj.sqs) {
35+
return {
36+
Arn: targetObj.sqs.arn,
37+
Id: randomTargetId(stateMachineName, status),
38+
SqsParameters: {
39+
MessageGroupId: targetObj.sqs.messageGroupId,
40+
},
41+
};
42+
} else if (targetObj.kinesis && _.isString(targetObj.kinesis)) {
43+
return {
44+
Arn: targetObj.kinesis,
45+
Id: randomTargetId(stateMachineName, status),
46+
};
47+
} else if (targetObj.kinesis) {
48+
return {
49+
Arn: targetObj.kinesis.arn,
50+
Id: randomTargetId(stateMachineName, status),
51+
KinesisParameters: {
52+
PartitionKeyPath: targetObj.kinesis.partitionKeyPath,
53+
},
54+
};
55+
} else if (targetObj.firehose) {
56+
return {
57+
Arn: targetObj.firehose,
58+
Id: randomTargetId(stateMachineName, status),
59+
};
60+
} else if (targetObj.lambda) {
61+
return {
62+
Arn: targetObj.lambda,
63+
Id: randomTargetId(stateMachineName, status),
64+
};
65+
} else if (targetObj.stepFunctions) {
66+
return {
67+
Arn: targetObj.stepFunctions,
68+
Id: randomTargetId(stateMachineName, status),
69+
};
70+
}
71+
return undefined;
72+
}
73+
74+
function compileIamPermission(targetObj) {
75+
if (targetObj.sns) {
76+
return {
77+
action: 'sns:Publish',
78+
resource: targetObj.sns,
79+
};
80+
} else if (targetObj.sqs && _.isString(targetObj.sqs)) {
81+
return {
82+
action: 'sqs:SendMessage',
83+
resource: targetObj.sqs,
84+
};
85+
} else if (targetObj.sqs) {
86+
return {
87+
action: 'sqs:SendMessage',
88+
resource: targetObj.sqs.arn,
89+
};
90+
} else if (targetObj.kinesis && _.isString(targetObj.kinesis)) {
91+
return {
92+
action: 'kinesis:PutRecord',
93+
resource: targetObj.kinesis,
94+
};
95+
} else if (targetObj.kinesis) {
96+
return {
97+
action: 'kinesis:PutRecord',
98+
resource: targetObj.kinesis.arn,
99+
};
100+
} else if (targetObj.firehose) {
101+
return {
102+
action: 'firehose:PutRecord',
103+
resource: targetObj.firehose,
104+
};
105+
} else if (targetObj.lambda) {
106+
return {
107+
action: 'lambda:InvokeFunction',
108+
resource: targetObj.lambda,
109+
};
110+
} else if (targetObj.stepFunctions) {
111+
return {
112+
action: 'states:StartExecution',
113+
resource: targetObj.stepFunctions,
114+
};
115+
}
116+
117+
return undefined;
118+
}
119+
120+
function bootstrapIamRole() {
121+
const iamRole = {
122+
Type: 'AWS::IAM::Role',
123+
Properties: {
124+
AssumeRolePolicyDocument: {
125+
Statement: {
126+
Effect: 'Allow',
127+
Action: 'sts:AssumeRole',
128+
Principal: {
129+
Service: 'events.amazonaws.com',
130+
},
131+
},
132+
},
133+
Policies: [
134+
{
135+
PolicyName: 'root',
136+
PolicyDocument: {
137+
Version: '2012-10-17',
138+
Statement: [],
139+
},
140+
},
141+
],
142+
},
143+
};
144+
const addPermission = (action, resource) => {
145+
iamRole.Properties.Policies[0].PolicyDocument.Statement.push({
146+
Effect: 'Allow',
147+
Action: action,
148+
Resource: resource,
149+
});
150+
};
151+
152+
return { iamRole, addPermission };
153+
}
154+
155+
function* compileResources(stateMachineLogicalId, stateMachineName, notificationsObj) {
156+
const iamRoleLogicalId = `${stateMachineLogicalId}NotificationsIamRole`;
157+
const { iamRole, addPermission } = bootstrapIamRole();
158+
159+
for (const status of executionStatuses) {
160+
const targets = notificationsObj[status];
161+
if (!_.isEmpty(targets)) {
162+
const cfnTargets = targets
163+
.map(t => compileTarget(stateMachineName, status, t))
164+
.filter(_.isObjectLike);
165+
targets
166+
.map(compileIamPermission)
167+
.filter(_.isObjectLike)
168+
.forEach(({ action, resource }) => addPermission(action, resource));
169+
170+
const eventRuleLogicalId =
171+
`${stateMachineLogicalId}Notifications${status.replace('_', '')}EventRule`;
172+
const eventRule = {
173+
Type: 'AWS::Events::Rule',
174+
Properties: {
175+
Description: `[${status}] status notification for state machine [${stateMachineName}]`,
176+
EventPattern: {
177+
source: ['aws.states'],
178+
'detail-type': ['Step Functions Execution Status Change'],
179+
detail: {
180+
status: [status],
181+
},
182+
},
183+
Name: `${stateMachineName}-${status}-notification`,
184+
RoleArn: {
185+
'Fn::GetAtt': [iamRoleLogicalId, 'Arn'],
186+
},
187+
Targets: cfnTargets,
188+
},
189+
};
190+
yield [eventRuleLogicalId, eventRule];
191+
}
192+
}
193+
194+
if (!_.isEmpty(iamRole.Properties.Policies[0].PolicyDocument.Statement)) {
195+
yield [iamRoleLogicalId, iamRole];
196+
}
197+
}
198+
199+
function validateConfig(serverless, stateMachineName, notificationsObj) {
200+
// no notifications defined at all
201+
if (!_.isObject(notificationsObj)) {
202+
return false;
203+
}
204+
205+
const { error } = Joi.validate(
206+
notificationsObj, schema, { allowUnknown: false });
207+
208+
if (error) {
209+
serverless.cli.consoleLog(
210+
`State machine [${stateMachineName}] : notifications config is malformed. ` +
211+
'Please see https://github.com/horike37/serverless-step-functions for examples. ' +
212+
`${error}`);
213+
return false;
214+
}
215+
216+
return true;
217+
}
218+
219+
module.exports = {
220+
compileNotifications() {
221+
const newResourcePairs = _.flatMap(this.getAllStateMachines(), (name) => {
222+
const stateMachineObj = this.getStateMachine(name);
223+
const stateMachineLogicalId = this.getStateMachineLogicalId(name, stateMachineObj);
224+
const stateMachineName = stateMachineObj.name || name;
225+
const notificationsObj = stateMachineObj.notifications;
226+
227+
if (!validateConfig(this.serverless, stateMachineName, notificationsObj)) {
228+
return [];
229+
}
230+
231+
const resourcesIterator = compileResources(
232+
stateMachineLogicalId,
233+
stateMachineName,
234+
notificationsObj);
235+
236+
return Array.from(resourcesIterator);
237+
});
238+
const newResources = _.fromPairs(newResourcePairs);
239+
240+
_.merge(
241+
this.serverless.service.provider.compiledCloudFormationTemplate.Resources,
242+
newResources);
243+
return BbPromise.resolve();
244+
},
245+
};
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
const Joi = require('@hapi/joi');
2+
3+
const sqsWithParams = Joi.object().keys({
4+
arn: Joi.string().required(),
5+
messageGroupId: Joi.string().required(),
6+
});
7+
8+
const kinesisWithParams = Joi.object().keys({
9+
arn: Joi.string().required(),
10+
partitionKeyPath: Joi.string(),
11+
});
12+
13+
const target = Joi.object().keys({
14+
sns: Joi.string(),
15+
sqs: Joi.alternatives().try(sqsWithParams, Joi.string()),
16+
kinesis: Joi.alternatives().try(kinesisWithParams, Joi.string()),
17+
firehose: Joi.string(),
18+
lambda: Joi.string(),
19+
stepFunctions: Joi.string(),
20+
});
21+
22+
const targets = Joi.array().items(target);
23+
24+
const schema = Joi.object().keys({
25+
ABORTED: targets,
26+
FAILED: targets,
27+
RUNNING: targets,
28+
SUCCEEDED: targets,
29+
TIMED_OUT: targets,
30+
});
31+
32+
module.exports = schema;

0 commit comments

Comments
 (0)