Skip to content

Commit 06b09a2

Browse files
authored
Add Apache Spark receiver (#5318)
* Add apachespark receiver * Add more docker contents * Update CHANGELOG.md * Fix formatting * Fix golden file test format * rename test file * Ignore scope version in testing * Update internal/components/components.go * Fix fmt error
1 parent 1fbb427 commit 06b09a2

File tree

12 files changed

+1420
-0
lines changed

12 files changed

+1420
-0
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66

77
- (Splunk) Update Python to 3.12.5 in the Smart Agent bundle for Linux and Windows. Check [What’s New In Python 3.12](https://docs.python.org/3/whatsnew/3.12.html) for details. ([#5298](https://github.com/signalfx/splunk-otel-collector/pull/5298))
88

9+
### 🚀 New components 🚀
10+
11+
- (Splunk) Add `apachespark` receiver ([#5318](https://github.com/signalfx/splunk-otel-collector/pull/5318))
12+
913
## v0.108.1
1014

1115
This Splunk OpenTelemetry Collector release includes changes from the [opentelemetry-collector v0.108.1](https://github.com/open-telemetry/opentelemetry-collector/releases/tag/v0.108.1) and the [opentelemetry-collector-contrib v0.108.0](https://github.com/open-telemetry/opentelemetry-collector-contrib/releases/tag/v0.108.0) releases where appropriate.

docker/apachespark/Dockerfile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
FROM apache/spark:3.4.0-python3
2+
3+
COPY ./long_running.py /opt/spark/examples/src/main/python/long_running.py
4+
RUN chmod +x /opt/spark/examples/src/main/python/long_running.py
5+
6+
CMD [ "/opt/spark/bin/spark-submit", "/opt/spark/examples/src/main/python/long_running.py" ]
7+
8+
EXPOSE 4040

docker/apachespark/long_running.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# Keep the original spark license.
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one or more
4+
# contributor license agreements. See the NOTICE file distributed with
5+
# this work for additional information regarding copyright ownership.
6+
# The ASF licenses this file to You under the Apache License, Version 2.0
7+
# (the "License"); you may not use this file except in compliance with
8+
# the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
# Original source: https://github.com/apache/spark/blob/master/examples/src/main/python/status_api_demo.py
19+
#
20+
# NOTICE:
21+
# Apache Spark
22+
# Copyright 2014 and onwards The Apache Software Foundation.
23+
#
24+
# This product includes software developed at
25+
# The Apache Software Foundation (http://www.apache.org/).
26+
#
27+
28+
import time
29+
import threading
30+
import queue as Queue
31+
from typing import Any, Callable, List, Tuple
32+
33+
from pyspark import SparkConf, SparkContext
34+
35+
36+
def delayed(seconds: int) -> Callable[[Any], Any]:
37+
def f(x: int) -> int:
38+
time.sleep(seconds)
39+
return x
40+
return f
41+
42+
43+
def call_in_background(f: Callable[..., Any], *args: Any) -> Queue.Queue:
44+
result: Queue.Queue = Queue.Queue(1)
45+
t = threading.Thread(target=lambda: result.put(f(*args)))
46+
t.daemon = True
47+
t.start()
48+
return result
49+
50+
51+
def main() -> None:
52+
conf = SparkConf().set("spark.ui.showConsoleProgress", "false")
53+
sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf)
54+
55+
def run() -> List[Tuple[int, int]]:
56+
rdd = sc.parallelize(range(10), 10).map(delayed(2))
57+
reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
58+
return reduced.map(delayed(2)).collect()
59+
60+
result = call_in_background(run)
61+
status = sc.statusTracker()
62+
while result.empty():
63+
ids = status.getJobIdsForGroup()
64+
for id in ids:
65+
job = status.getJobInfo(id)
66+
assert job is not None
67+
68+
print("Job", id, "status: ", job.status)
69+
for sid in job.stageIds:
70+
info = status.getStageInfo(sid)
71+
if info:
72+
print("Stage %d: %d tasks total (%d active, %d complete)" %
73+
(sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks))
74+
time.sleep(1)
75+
76+
print("Job results are:", result.get())
77+
sc.stop()
78+
79+
80+
if __name__ == "__main__":
81+
for i in range(100):
82+
main()

docker/docker-compose.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ services:
1515
build: ./apache
1616
ports:
1717
- "18080:80"
18+
apachespark:
19+
image: quay.io/splunko11ytest/apachespark:latest
20+
profiles:
21+
- integration
22+
build: ./apachespark
23+
ports:
24+
- "4040:4040"
1825
# Cassandra image for collectd-cassandra test:
1926
cassandra:
2027
image: quay.io/splunko11ytest/cassandra:latest

docs/components.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ The distribution offers support for the following components.
1515
|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-----------------|
1616
| [active_directory_ds](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/activedirectorydsreceiver) | [beta] |
1717
| [apache](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/apachereceiver) | [alpha] |
18+
| [apachespark](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/apachesparkreceiver) | [alpha] |
1819
| [awscontainerinsights](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/awscontainerinsightreceiver) | [beta] |
1920
| [awsecscontainermetrics](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/awsecscontainermetricsreceiver) | [beta] |
2021
| [azureblob](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/azureblobreceiver) | [alpha] |

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ require (
6262
github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.108.0
6363
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/activedirectorydsreceiver v0.108.0
6464
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachereceiver v0.108.0
65+
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachesparkreceiver v0.108.0
6566
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.108.0
6667
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsecscontainermetricsreceiver v0.108.0
6768
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver v0.108.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1394,6 +1394,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/receiver/activedirecto
13941394
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/activedirectorydsreceiver v0.108.0/go.mod h1:Zn5FJSJ2h6DUw84ZqwPiJFCXZWsEeBo++SV3xNeSuO4=
13951395
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachereceiver v0.108.0 h1:RFB7p/HUt46C4HDZjwwDDYYv+ipn/SVTpbuLgXzEbZQ=
13961396
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachereceiver v0.108.0/go.mod h1:7o6+Ojnd9enSTTVUfzORsgMT83qy0Ds0rNl6CXJb5cQ=
1397+
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachesparkreceiver v0.108.0 h1:alseMqI1EDNxMBxfBMQde3r0g/KUauEWbm/u6vKzjnM=
1398+
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachesparkreceiver v0.108.0/go.mod h1:GgM4G2KU5/CYVKUpnJkTsud8B6OvOlnNm6gz3KjsuwU=
13971399
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.108.0 h1:GQPIFe/91bnSymg3/4cfW6kbQD6+VMCX8yPzgaVGoaQ=
13981400
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.108.0/go.mod h1:BK/JiWZQtGM94UOwa+JWgI6ABUtJExPvM82uTQPXOns=
13991401
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsecscontainermetricsreceiver v0.108.0 h1:4/SyCI9608uBuk0qkl7BWboXxWlArSys/qHBU/Wk4fQ=

internal/components/components.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import (
5757
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor"
5858
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/activedirectorydsreceiver"
5959
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachereceiver"
60+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachesparkreceiver"
6061
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver"
6162
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsecscontainermetricsreceiver"
6263
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver"
@@ -164,6 +165,7 @@ func Get() (otelcol.Factories, error) {
164165
receivers, err := receiver.MakeFactoryMap(
165166
activedirectorydsreceiver.NewFactory(),
166167
apachereceiver.NewFactory(),
168+
apachesparkreceiver.NewFactory(),
167169
awscontainerinsightreceiver.NewFactory(),
168170
awsecscontainermetricsreceiver.NewFactory(),
169171
azureblobreceiver.NewFactory(),

internal/components/components_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func TestDefaultComponents(t *testing.T) {
4646
expectedReceivers := []string{
4747
"active_directory_ds",
4848
"apache",
49+
"apachespark",
4950
"awscontainerinsightreceiver",
5051
"awsecscontainermetrics",
5152
"azureblob",
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright Splunk, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//go:build integration
16+
17+
package tests
18+
19+
import (
20+
"testing"
21+
22+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
23+
24+
"github.com/signalfx/splunk-otel-collector/tests/testutils"
25+
)
26+
27+
const sparkPort = "4040"
28+
29+
func TestApacheSparkIntegration(t *testing.T) {
30+
testutils.CheckGoldenFile(t, "all_metrics_config.yaml", "all_expected_metrics.yaml",
31+
pmetrictest.IgnoreMetricValues(),
32+
pmetrictest.IgnoreStartTimestamp(),
33+
pmetrictest.IgnoreTimestamp(),
34+
pmetrictest.IgnoreScopeVersion(),
35+
pmetrictest.IgnoreResourceAttributeValue("spark.application.id"),
36+
pmetrictest.IgnoreResourceAttributeValue("spark.application.name"),
37+
pmetrictest.IgnoreMetricAttributeValue("active", "spark.stage.status"),
38+
pmetrictest.IgnoreMetricAttributeValue("complete", "spark.stage.status"),
39+
pmetrictest.IgnoreMetricAttributeValue("failed", "spark.stage.status"),
40+
pmetrictest.IgnoreMetricAttributeValue("pending", "spark.stage.status"),
41+
pmetrictest.IgnoreMetricDataPointsOrder(),
42+
)
43+
}

0 commit comments

Comments
 (0)