Skip to content

Commit 26f4323

Browse files
authored
add email alerts with Python UDFs & Tasks (#8)
* first implementation * get it working * changes * improve email * add comments
1 parent 9374c4e commit 26f4323

File tree

2 files changed

+266
-0
lines changed

2 files changed

+266
-0
lines changed

Makefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ debug: ## Clean up everything and redeploy to fix bugs
8484
make deploy
8585
@echo "Debug cleanup and redeploy completed successfully."
8686

87+
alerts: ## Setup alerts
88+
@echo "Setting up alerts..."
89+
awslocal ses verify-email-identity --email [email protected]
90+
snow sql -f solutions/task_alert_udf.sql -c localstack
91+
@echo "Alerts setup successfully."
92+
8793
test: ## Run tests
8894
@echo "Running tests..."
8995
bash -c "source env/bin/activate && pytest tests/"

solutions/task_alert_udf.sql

Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
-- =====================================================
2+
-- SMART FACTORY AUTOMATED ALERTING SYSTEM
3+
-- =====================================================
4+
-- This demo showcases real-time critical machine alerting using:
5+
-- • Snowflake Tasks for automated scheduling
6+
-- • Python UDFs with SES email integration
7+
-- • Real-time database queries for critical machine detection
8+
--
9+
-- Target: FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics
10+
-- Alert Frequency: Every 30 seconds (configurable)
11+
-- Email Service: LocalStack SES
12+
-- =====================================================
13+
14+
-- Set context
15+
USE DATABASE FACTORY_PIPELINE_DEMO;
16+
USE SCHEMA PUBLIC;
17+
18+
-- =====================================================
19+
-- PART 1: EMAIL ALERT FUNCTION
20+
-- =====================================================
21+
-- Python UDF that sends professional email alerts via SES
22+
-- when critical machines are detected in the factory
23+
24+
-- UDF to send email report with critical machines data passed as parameter
25+
CREATE OR REPLACE FUNCTION send_critical_machines_report(critical_machines_json VARCHAR)
26+
RETURNS VARIANT
27+
LANGUAGE PYTHON
28+
RUNTIME_VERSION = 3.9
29+
PACKAGES = ('boto3')
30+
HANDLER = 'send_report'
31+
AS $$
32+
import boto3
33+
import json
34+
from datetime import datetime
35+
36+
def send_report(critical_machines_json):
37+
try:
38+
# LocalStack SES configuration for email delivery
39+
endpoint_url = "http://localhost:4566"
40+
41+
# Configure SES client for LocalStack
42+
ses_client = boto3.client(
43+
"ses",
44+
endpoint_url=endpoint_url,
45+
aws_access_key_id="test",
46+
aws_secret_access_key="test",
47+
region_name="us-east-1"
48+
)
49+
50+
# Email configuration - sender and recipient
51+
sender_email = "[email protected]"
52+
recipient_email = "[email protected]"
53+
54+
# Verify email identities (auto-verified in LocalStack)
55+
try:
56+
ses_client.verify_email_identity(EmailAddress=sender_email)
57+
ses_client.verify_email_identity(EmailAddress=recipient_email)
58+
except:
59+
pass # Identities might already be verified
60+
61+
# Parse the critical machines data from pipe-delimited string
62+
# Format: machine_id|risk_score|issue;machine_id|risk_score|issue;...
63+
critical_machines = []
64+
try:
65+
if critical_machines_json and critical_machines_json.strip():
66+
machine_entries = critical_machines_json.split(';')
67+
for entry in machine_entries:
68+
if entry.strip():
69+
parts = entry.split('|')
70+
if len(parts) >= 3:
71+
critical_machines.append({
72+
"machine_id": parts[0],
73+
"risk_score": float(parts[1]) if parts[1] else 0.0,
74+
"issue": parts[2] if parts[2] else "Immediate maintenance required"
75+
})
76+
except Exception as e:
77+
# Fallback to empty list if parsing fails
78+
critical_machines = []
79+
80+
# Exit early if no critical machines found
81+
if not critical_machines:
82+
return {
83+
"status": "success",
84+
"message": "No critical machines found",
85+
"email_sent": False,
86+
"timestamp": datetime.now().isoformat()
87+
}
88+
89+
# Create professional email content
90+
subject = f"CRITICAL ALERT: {len(critical_machines)} Machines Require Immediate Attention"
91+
92+
# Plain text version for email clients that don't support HTML
93+
body_text = f"""CRITICAL MACHINES ALERT REPORT
94+
95+
Total Critical Machines: {len(critical_machines)}
96+
Report Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
97+
98+
CRITICAL MACHINES:
99+
"""
100+
101+
for machine in critical_machines:
102+
body_text += f"""
103+
- Machine ID: {machine['machine_id']}
104+
Risk Score: {machine['risk_score']}%
105+
Issue: {machine['issue']}
106+
"""
107+
108+
body_text += """
109+
Please take immediate action to prevent equipment failure.
110+
111+
---
112+
Smart Factory Health Monitor
113+
Powered by LocalStack + Snowflake
114+
"""
115+
116+
# Professional HTML version with styling
117+
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
118+
machine_count = len(critical_machines)
119+
120+
body_html = """
121+
<html>
122+
<head></head>
123+
<body>
124+
<h2 style="color: #d32f2f;">CRITICAL MACHINES ALERT REPORT</h2>
125+
126+
<div style="background-color: #ffebee; padding: 15px; border-left: 4px solid #d32f2f; margin: 10px 0;">
127+
<h3>Summary</h3>
128+
<ul>
129+
<li><strong>Total Critical Machines:</strong> """ + str(machine_count) + """</li>
130+
<li><strong>Report Generated:</strong> """ + current_time + """</li>
131+
</ul>
132+
</div>
133+
134+
<h3>Critical Machines Details:</h3>
135+
<table style="border-collapse: collapse; width: 100%;">
136+
<tr style="background-color: #f5f5f5;">
137+
<th style="border: 1px solid #ddd; padding: 8px;">Machine ID</th>
138+
<th style="border: 1px solid #ddd; padding: 8px;">Risk Score</th>
139+
<th style="border: 1px solid #ddd; padding: 8px;">Issue</th>
140+
</tr>
141+
"""
142+
143+
# Add each critical machine to the HTML table
144+
for machine in critical_machines:
145+
body_html += """
146+
<tr>
147+
<td style="border: 1px solid #ddd; padding: 8px; font-weight: bold;">""" + str(machine['machine_id']) + """</td>
148+
<td style="border: 1px solid #ddd; padding: 8px; color: #d32f2f;">""" + str(machine['risk_score']) + """%</td>
149+
<td style="border: 1px solid #ddd; padding: 8px;">""" + str(machine['issue']) + """</td>
150+
</tr>
151+
"""
152+
153+
body_html += """
154+
</table>
155+
156+
<p><strong>Please take immediate action to prevent equipment failure.</strong></p>
157+
158+
<hr>
159+
<p style="font-size: 12px; color: #666;">
160+
Smart Factory Health Monitor<br>
161+
Powered by LocalStack + Snowflake
162+
</p>
163+
</body>
164+
</html>
165+
"""
166+
167+
# Send the email via LocalStack SES
168+
response = ses_client.send_email(
169+
Source=sender_email,
170+
Destination={
171+
'ToAddresses': [recipient_email]
172+
},
173+
Message={
174+
'Subject': {
175+
'Data': subject,
176+
'Charset': 'UTF-8'
177+
},
178+
'Body': {
179+
'Text': {
180+
'Data': body_text,
181+
'Charset': 'UTF-8'
182+
},
183+
'Html': {
184+
'Data': body_html,
185+
'Charset': 'UTF-8'
186+
}
187+
}
188+
}
189+
)
190+
191+
# Return success response with email details
192+
return {
193+
"status": "success",
194+
"total_critical_machines": len(critical_machines),
195+
"email_sent": True,
196+
"message_id": response.get('MessageId'),
197+
"recipient": recipient_email,
198+
"sender": sender_email,
199+
"timestamp": datetime.now().isoformat()
200+
}
201+
202+
except Exception as e:
203+
# Return error response if email sending fails
204+
return {
205+
"status": "error",
206+
"error": str(e),
207+
"email_sent": False,
208+
"timestamp": datetime.now().isoformat()
209+
}
210+
$$;
211+
212+
-- =====================================================
213+
-- PART 2: DATA QUERY VIEW
214+
-- =====================================================
215+
-- View that queries critical machines and formats data for the UDF
216+
-- Uses LISTAGG to create pipe-delimited string format
217+
218+
CREATE OR REPLACE VIEW critical_machines_list AS
219+
SELECT
220+
LISTAGG(
221+
machine_id || '|' || failure_risk_score || '|' ||
222+
CASE WHEN maintenance_recommendation IS NULL THEN 'Immediate maintenance required'
223+
ELSE maintenance_recommendation END,
224+
';'
225+
) as machines_data
226+
FROM FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics
227+
WHERE health_status = 'CRITICAL';
228+
229+
-- =====================================================
230+
-- PART 3: AUTOMATED TASK SCHEDULER
231+
-- =====================================================
232+
-- Snowflake Task that runs every 30 seconds to check for critical machines
233+
-- and automatically sends email alerts when found
234+
235+
CREATE OR REPLACE TASK automated_critical_alert_task
236+
WAREHOUSE = 'test'
237+
SCHEDULE = '30 SECONDS'
238+
AS
239+
SELECT
240+
CASE
241+
WHEN machines_data IS NULL OR machines_data = '' THEN
242+
FACTORY_PIPELINE_DEMO.PUBLIC.send_critical_machines_report('')
243+
ELSE
244+
FACTORY_PIPELINE_DEMO.PUBLIC.send_critical_machines_report(machines_data)
245+
END
246+
FROM FACTORY_PIPELINE_DEMO.PUBLIC.critical_machines_list;
247+
248+
-- =====================================================
249+
-- PART 4: ACTIVATE THE SYSTEM
250+
-- =====================================================
251+
-- Resume the task to start automated alert monitoring
252+
253+
ALTER TASK automated_critical_alert_task RESUME;
254+
255+
-- =====================================================
256+
-- PART 5: SYSTEM STATUS CHECK
257+
-- =====================================================
258+
-- Verify the automated alerting system is running
259+
260+
SHOW TASKS LIKE 'automated_critical_alert_task';

0 commit comments

Comments
 (0)