Skip to content

Commit 3d5646b

Browse files
author
Christian Herrera
committed
Add practical exercise lesson 3
1 parent e4b72ed commit 3d5646b

File tree

4 files changed

+122
-1
lines changed

4 files changed

+122
-1
lines changed

project/Dependencies.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ object Dependencies {
1313
"com.rabbitmq" % "amqp-client" % "5.21.0",
1414
"com.typesafe" % "config" % "1.4.1",
1515
"io.delta" %% "delta-spark" % "3.1.0",
16-
"io.spray" %% "spray-json" % "1.3.6"
16+
"io.spray" %% "spray-json" % "1.3.6",
17+
"io.circe" %% "circe-core" % "0.14.7",
18+
"io.circe" %% "circe-generic" % "0.14.9",
19+
"io.circe" %% "circe-parser" % "0.14.9"
1720
)
1821
private val test = Seq(
1922
"org.scalatest" %% "scalatest" % "3.2.19",
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
aws events put-events \
2+
--endpoint-url http://localhost:4566 \
3+
--region us-east-1 \
4+
--entries '[{
5+
"EventBusName": "codely.domain_events",
6+
"Source": "codely",
7+
"DetailType": "user.registered",
8+
"Detail": "{ \"detail_type\": \"userRegistered\", \"user_id\": \"123\", \"email\": \"[email protected]\", \"timestamp\": \"2023-07-21T10:00:00Z\" }"
9+
}, {
10+
"EventBusName": "codely.domain_events",
11+
"Source": "codely",
12+
"DetailType": "user.registered",
13+
"Detail": "{ \"detail_type\": \"userRegistered\", \"user_id\": \"124\", \"email\": \"[email protected]\", \"timestamp\": \"2023-07-21T10:01:00Z\" }"
14+
}, {
15+
"EventBusName": "codely.domain_events",
16+
"Source": "codely",
17+
"DetailType": "user.registered",
18+
"Detail": "{ \"detail_type\": \"other\", \"user_id\": \"125\", \"email\": \"[email protected]\", \"timestamp\": \"2023-07-21T10:02:00Z\" }"
19+
}]'
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.codely.lesson_03_spark_streaming_sqs.z_practical_exercise
2+
3+
import io.circe.generic.auto._
4+
import io.circe.parser._
5+
import org.apache.spark.SparkConf
6+
import org.apache.spark.streaming.{Seconds, StreamingContext}
7+
import org.apache.spark.streaming.dstream.DStream
8+
9+
object SQSReceiverSparkApp extends App {
10+
private val sqsEndpoint = "http://localhost:4566"
11+
private val region = "us-east-1"
12+
private val queueUrl = "http://localhost:4566/000000000000/send_welcome_email_on_user_registered"
13+
14+
val conf = new SparkConf().setAppName("SQSReceiverSparkApp").setMaster("local[*]")
15+
16+
val ssc = new StreamingContext(conf, Seconds(5))
17+
18+
val receiver = new SQSSparkReceiver(sqsEndpoint, region, queueUrl)
19+
20+
val messages: DStream[String] = ssc.receiverStream(receiver)
21+
22+
case class Detail(detail_type: String, user_id: String, email: String, timestamp: String)
23+
case class Event(detail: Detail)
24+
25+
val filteredMessages = messages.flatMap { message =>
26+
decode[Event](message) match {
27+
case Right(event) if event.detail.detail_type == "userRegistered" => Some(event)
28+
case _ => None
29+
}
30+
}
31+
32+
val eventsDStream = filteredMessages.map { event =>
33+
(event.detail.user_id, event.detail.timestamp)
34+
}
35+
36+
val windowedCounts = eventsDStream
37+
.map { case (userId, _) => (userId, 1) }
38+
.reduceByKeyAndWindow(_ + _, Seconds(300))
39+
40+
windowedCounts.print()
41+
42+
ssc.start()
43+
ssc.awaitTermination()
44+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.codely.lesson_03_spark_streaming_sqs.z_practical_exercise
2+
3+
import com.amazonaws.client.builder.AwsClientBuilder
4+
import com.amazonaws.services.sqs.model.{DeleteMessageRequest, ReceiveMessageRequest}
5+
import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder}
6+
import org.apache.spark.storage.StorageLevel
7+
import org.apache.spark.streaming.receiver.Receiver
8+
9+
import scala.collection.JavaConverters._
10+
11+
class SQSSparkReceiver(endpoint: String, region: String, queueUrl: String)
12+
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
13+
14+
private var sqsClient: AmazonSQS = _
15+
16+
override def onStart(): Unit = {
17+
18+
sqsClient = AmazonSQSClientBuilder
19+
.standard()
20+
.withEndpointConfiguration(
21+
new AwsClientBuilder.EndpointConfiguration(
22+
endpoint,
23+
region
24+
)
25+
)
26+
.build()
27+
28+
new Thread("SQS Receiver") {
29+
override def run() {
30+
receive()
31+
}
32+
}.start()
33+
}
34+
35+
override def onStop(): Unit = {
36+
// Any necessary cleanup
37+
}
38+
39+
private def receive(): Unit = {
40+
while (!isStopped()) {
41+
val request = new ReceiveMessageRequest(queueUrl)
42+
.withMaxNumberOfMessages(10)
43+
.withWaitTimeSeconds(20)
44+
45+
val messages = sqsClient.receiveMessage(request).getMessages.asScala
46+
47+
for (message <- messages) {
48+
store(message.getBody)
49+
val deleteRequest =
50+
new DeleteMessageRequest(queueUrl, message.getReceiptHandle)
51+
sqsClient.deleteMessage(deleteRequest)
52+
}
53+
}
54+
}
55+
}

0 commit comments

Comments
 (0)