Skip to content

Commit 110d050

Browse files
authored
Merge pull request #183 from smacker/add_aws_s3_support
add support for Amazon Web Services S3
2 parents 90587da + 3258f5d commit 110d050

File tree

3 files changed

+46
-9
lines changed

3 files changed

+46
-9
lines changed

build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ libraryDependencies ++= Seq(
2323
engine % Compile,
2424
jgit % Compile,
2525
gcs % Compile,
26+
hadoopAws % Compile,
2627
fixNetty,
2728
cassandraDriverMetrics % Provided, //needed for using Driver \wo Spark from SparkConnector
2829
cassandraSparkConnector % Compile,
@@ -34,7 +35,6 @@ libraryDependencies ++= Seq(
3435
bblfshClient % Compile,
3536
avro % Compile,
3637
parquetAvro % Compile,
37-
hadoopCommon % Compile,
3838
scalaJsonParser % Compile //needed for docFreq reading
3939
//TODO(bzz): remove scalaJsonParser at https://github.com/src-d/gemini/issues/112
4040
)
@@ -44,6 +44,7 @@ assemblyJarName in assemblyPackageDependency := s"${name.value}-deps.jar"
4444

4545
assemblyMergeStrategy in assembly := {
4646
case "META-INF/io.netty.versions.properties" => MergeStrategy.last
47+
case "mime.types" => MergeStrategy.last
4748
case x =>
4849
val oldStrategy = (assemblyMergeStrategy in assembly).value
4950
oldStrategy(x)

project/Dependencies.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ object Dependencies {
1818
lazy val log4jBinding = "org.slf4j" % "slf4j-log4j12" % "1.7.25"
1919
lazy val avro = "org.apache.avro" % "avro" % "1.7.7"
2020
lazy val parquetAvro = "org.apache.parquet" % "parquet-avro" % "1.9.0"
21-
// same version as in spark and exclude conflict dependencies with cassandra driver
22-
// similar to:
23-
// https://github.com/datastax/spark-cassandra-connector/blob/v2.0.6/project/SparkCassandraConnectorBuild.scala#L184
24-
lazy val hadoopCommon = ("org.apache.hadoop" % "hadoop-common" % "2.6.5")
21+
// compatible version with spark 2.2
22+
// hadoop-aws will bring hadoop-common package with it which is needed for gemini itself
23+
lazy val hadoopAws = ("org.apache.hadoop" % "hadoop-aws" % "2.7.2")
24+
// excludes conflict dependencies with cassandra driver
25+
// similar to:
26+
// https://github.com/datastax/spark-cassandra-connector/blob/v2.0.6/project/SparkCassandraConnectorBuild.scala#L184
2527
.exclude("com.sun.jersey", "jersey-server")
2628
.exclude("commons-beanutils", "commons-beanutils-core")
2729
lazy val scalapb = "com.thesamet.scalapb" %% "scalapb-runtime" % "0.8.4"

src/main/scala/tech/sourced/gemini/cmd/HashSparkApp.scala

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@ case class HashAppConfig(
2525
docFreqFile: String = "",
2626
verbose: Boolean = false,
2727
mode: String = Gemini.fileSimilarityMode,
28+
replace: Boolean = false,
2829
gcsKeyFile: String = "",
29-
replace: Boolean = false
30+
awsKey: String = "",
31+
awsSecret: String = "",
32+
awsS3Endpoint: String = ""
3033
)
3134

3235
/**
@@ -96,12 +99,21 @@ object HashSparkApp extends App with Logging {
9699
opt[String]("doc-freq-file")
97100
.action((x, c) => c.copy(docFreqFile = x))
98101
.text("path to file with feature frequencies")
99-
opt[String]("gcs-keyfile")
100-
.action((x, c) => c.copy(gcsKeyFile = x))
101-
.text("path to JSON keyfile for authentication in Google Cloud Storage")
102102
opt[Unit]("replace")
103103
.action((x, c) => c.copy(replace = true))
104104
.text("replace results of previous hashing")
105+
opt[String]("gcs-keyfile")
106+
.action((x, c) => c.copy(gcsKeyFile = x))
107+
.text("path to JSON keyfile for authentication in Google Cloud Storage")
108+
opt[String]("aws-key")
109+
.action((x, c) => c.copy(awsKey = x))
110+
.text("AWS access keys")
111+
opt[String]("aws-secret")
112+
.action((x, c) => c.copy(awsSecret = x))
113+
.text("AWS access secret")
114+
opt[String]("aws-s3-endpoint")
115+
.action((x, c) => c.copy(awsS3Endpoint = x))
116+
.text("region S3 endpoint")
105117
arg[String]("<path-to-git-repos>")
106118
.required()
107119
.action((x, c) => c.copy(reposPath = x))
@@ -132,6 +144,28 @@ object HashSparkApp extends App with Logging {
132144
spark.sparkContext.hadoopConfiguration.set("google.cloud.auth.service.account.json.keyfile", config.gcsKeyFile)
133145
}
134146

147+
// AWS S3 combo
148+
// The problem is we use old version of spark&hadoop which has 4 issues:
149+
// 1. It brings as dependency old amazon-aws package
150+
// which requires separate flag to enable support for current aws protocol
151+
spark.sparkContext.hadoopConfiguration.set("com.amazonaws.services.s3.enableV4", "true")
152+
// 2. Only NativeS3FileSystem works correctly with new protocol
153+
spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
154+
// 3. The client is configured to use the default S3A service endpoint
155+
// but for v4 protocol it must be set to the region endpoint bucket belongs to
156+
if (config.awsS3Endpoint.nonEmpty) {
157+
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", config.awsS3Endpoint)
158+
}
159+
// 4. Glob (from jgit-spark-connector) with key&secret in URL isn't supported by current version
160+
// $ ./hash s3a://key:token@bucket/repos/
161+
// Error: "Wrong FS: s3a://key:token@bucket/repos/*, expected: s3a://key:token@bucket"
162+
if (config.awsKey.nonEmpty) {
163+
spark.sparkContext.hadoopConfiguration.set("fs.s3a.awsAccessKeyId", config.awsKey)
164+
}
165+
if (config.awsSecret.nonEmpty) {
166+
spark.sparkContext.hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", config.awsSecret)
167+
}
168+
135169
val reposPath = config.reposPath
136170
val repos = listRepositories(reposPath, config.format, spark, config.limit)
137171
printRepositories(reposPath, repos)

0 commit comments

Comments
 (0)