Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 42 additions & 9 deletions src/main/scala/tech/sourced/gemini/Gemini.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import org.eclipse.jgit.lib.Constants.OBJ_BLOB
import org.eclipse.jgit.lib.ObjectInserter
import org.slf4j.{Logger => Slf4jLogger}
import tech.sourced.engine.Engine
import tech.sourced.engine._
import tech.sourced.featurext.generated.service.FeatureExtractorGrpc.FeatureExtractor
import tech.sourced.gemini.cmd.ReportApp
import tech.sourced.gemini.util.{Logger, URLFormatter}

import scala.io.Source
Expand Down Expand Up @@ -61,19 +61,52 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
def getRepos(reposPath: String, limit: Int = 0, format: String = "siva"): DataFrame = {
val engine = Engine(session, reposPath, format)
val repos = engine.getRepositories
// siva files contain only 1 remote per repository
// no need to do deduplication
val onlyUnique = format != "siva"

// get repositories ids to filter jgit-results
if (onlyUnique || limit > 0) {
val reposIdsSubset = if (onlyUnique) {
uniqueRepoIds(repos)
} else {
repos.select("id")
}

// engine.getRepositories.limit(n)...getFiles - doesn't work in engine now
// https://github.com/src-d/engine/issues/267
// use workaround with filter
if (limit <= 0) {
repos
} else {
log.info(s"Using only $limit repositories")
val repoIds = repos.limit(limit).select($"id").collect().map(_ (0))
val limitedReposIds = if (limit > 0) {
log.info(s"Using only $limit repositories")
reposIdsSubset.limit(limit)
} else {
reposIdsSubset
}

val repoIds = limitedReposIds.collect().map(_ (0))
repos.filter($"id".isin(repoIds: _*))
} else {
repos
}
}

def uniqueRepoIds(repos: DataFrame): DataFrame = {
repos
.getHEAD
.groupByKey(row => row.getAs[String]("hash"))
.reduceGroups {(r1, r2) =>
// In case of multiple repositories have the same HEAD we keep only one
// we prefer remote over local one
// in case of multiple remotes we choose random one
val v1 = r1.getAs[String]("repository_id")
if (v1.isEmpty || v1.startsWith("file://")) {
r2
} else {
r1
}
}
// FIXME there must be better way to do it but I couldn't find it
.withColumnRenamed("ReduceAggregator(org.apache.spark.sql.Row)", "tmp")
.select("tmp.repository_id")
}

val fileWithFuncPattern: Regex = "(.+):(.+):([0-9]+)".r

/**
Expand Down