Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/main/resources/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ USE __KEYSPACE__;
CREATE TABLE IF NOT EXISTS __KEYSPACE__.meta (sha1 ascii, repo text, commit ascii, path text, PRIMARY KEY (sha1, repo, commit, path));
CREATE TABLE IF NOT EXISTS __KEYSPACE__.hashtables_file (sha1 text, hashtable tinyint, value blob, PRIMARY KEY (hashtable, value, sha1));
CREATE TABLE IF NOT EXISTS __KEYSPACE__.hashtables_func (sha1 text, hashtable tinyint, value blob, PRIMARY KEY (hashtable, value, sha1));
CREATE TABLE IF NOT EXISTS __KEYSPACE__.docfreq (id text, docs int, df map<text, int>, PRIMARY KEY (id));
CREATE TABLE IF NOT EXISTS __KEYSPACE__.features_docs (id text, docs int, PRIMARY KEY (id));
CREATE TABLE IF NOT EXISTS __KEYSPACE__.features_freq (id text, feature text, weight int, PRIMARY KEY (id, feature));
21 changes: 15 additions & 6 deletions src/main/scala/tech/sourced/gemini/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,31 @@ import scala.collection.JavaConverters._

case class MetaCols(sha: String, repo: String, commit: String, path: String)
case class HashtablesCols(sha: String, hashtable: String, value: String)
case class DocFreqCols(id: String, docs: String, df: String)
case class FeaturesDocsCols(id: String, docs: String)
case class FeaturesFreqCols(id: String, feature: String, weight: String)

/**
* Tables is static typed definition of DB schema
*
* @param meta name of meta table
* @param hashtables name of hashtables table
* @param metaCols
* @param hashtablesCols
* @param hashtables prefix of hashtables table
* @param featuresDocs name of features documents table
* @param featuresFreq name of features frequencies table
* @param metaCols columns of meta table
* @param hashtablesCols columns of hashtables table
* @param featuresDocsCols columns of features documents table
* @param featuresFreqCols columns of features frequencies table
*/
case class Tables(meta: String,
hashtables: String,
docFreq: String,
featuresDocs: String,
featuresFreq: String,
metaCols: MetaCols,
hashtablesCols: HashtablesCols,
docFreqCols: DocFreqCols)
featuresDocsCols: FeaturesDocsCols,
featuresFreqCols: FeaturesFreqCols) {
def hashtables(mode: String): String = s"${hashtables}_$mode"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use ${x} or $x and not a mix of them at least not in the same string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I don't do as above InteliJ IDEA linter complains.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it complain? 😕

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't use $x because "can't resolve symbol hashtables_.
if I use ${mode} it complains "the enclosing block is redundant".

I try to keep IntelliJ IDEA happy because right now it's the only linter we have.
Some work about style/lint was done before in #87 and #86, but there is no agreement.
If we add some opinionated linter/formatter (like gofmt or prettier) most probably it would be possible to update code to follow the style automatically.

}

/**
* Database object contains common queries to DB
Expand Down
25 changes: 16 additions & 9 deletions src/main/scala/tech/sourced/gemini/FileQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,14 @@ class FileQuery(
case Gemini.funcSimilarityMode => FeaturesHash.funcParams
}

val hashtablesTable = s"${tables.hashtables}_${mode}"
val cols = tables.hashtablesCols
val wmh = hashFile(featuresList, docFreq, sampleSize)

val bands = FeaturesHash.wmhToBands(wmh, htnum, bandSize)

log.info("Looking for similar items")
val similar = bands.zipWithIndex.foldLeft(Set[String]()) { case (sim, (band, i)) =>
val cql = s"""SELECT ${cols.sha} FROM $keyspace.${hashtablesTable}
val cql = s"""SELECT ${cols.sha} FROM $keyspace.${tables.hashtables(mode)}
WHERE ${cols.hashtable}=$i AND ${cols.value}=0x${MathUtil.bytes2hex(band)}"""
log.debug(cql)

Expand Down Expand Up @@ -186,18 +185,26 @@ class FileQuery(

protected def readDocFreqFromDB(): Option[OrderedDocFreq] = {
log.info(s"Reading docFreq from DB")
val cols = tables.docFreqCols
val row = conn.execute(s"SELECT * FROM ${tables.docFreq} WHERE ${cols.id} = '${mode}'").one()
if (row == null) {
val docsCols = tables.featuresDocsCols
val freqCols = tables.featuresFreqCols
val docsRow = conn.execute(s"SELECT * FROM ${tables.featuresDocs} WHERE ${docsCols.id} = '$mode'").one()
if (docsRow == null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null in Scala should be avoided. It is usually preferred to wrap it with Option. You could then also change the if-else with the .fold method of Option:

    docsRow.fold[Option[OrderedDocFreq]]({
      log.warn("Document frequency table is empty.")
      None
    })(r => {
      var tokens = IndexedSeq[String]()
      val df = conn
        .execute(s"SELECT * FROM ${tables.featuresFreq} WHERE ${freqCols.id} = '$mode'")
        .asScala
        .map { row =>
          // tokens have to be sorted, df.keys isn't sorted
          val name = row.getString(freqCols.feature)
          tokens = tokens :+ name

          (name, row.getInt(freqCols.weight))
        }.toMap

      Some(OrderedDocFreq(r.getInt(docsCols.docs), tokens, df))
    })

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can I avoid null here? It's java lib returns null.
This code works similar to Converting a null into an Option, or something else

Copy link

@se7entyse7en se7entyse7en Jan 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By doing as follows:

val docsRow = Option(conn.execute(s"SELECT * FROM ${tables.featuresDocs} WHERE ${docsCols.id} = '$mode'").one())

But maybe is just me that don't like reading null so I usually wrap it with Option asap, also in order to use the scala stdlib such as fold in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm. I see what you mean now. Imo in this particular case checking null makes the code simpler.
Also, I see some other checks on null in our code base but we don't do Option(<something>) at all.

Please let me know if you see any clear disadvantages of the current code or clear advantages of the code with fold and I'll update it.

Copy link

@se7entyse7en se7entyse7en Jan 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think there's any clear pros/cons in favour/disfavour for one or another. I think it's more a style thing.

Also, I see some other checks on null in our code base but we don't do Option() at all.

In this case I think we should be consistent.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for completeness, using pattern matching is even more readable imo 😛

    Option(docsRow) match {
      case None => {
        log.warn("Document frequency table is empty.")
        None
      }
      case Some(row) => {
        var tokens = IndexedSeq[String]()
        val df = conn
          .execute(s"SELECT * FROM ${tables.featuresFreq} WHERE ${freqCols.id} = '$mode'")
          .asScala
          .map { row =>
            // tokens have to be sorted, df.keys isn't sorted
            val name = row.getString(freqCols.feature)
            tokens = tokens :+ name

            (name, row.getInt(freqCols.weight))
          }.toMap

        Some(OrderedDocFreq(docsRow.getInt(docsCols.docs), tokens, df))
      }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope you agreed to keep the code as it is now.

With you last example I don't see why it's better:

  1. you wrap into option only to do case None/Some. Most probably it's possible to write the same with case null/_
  2. it adds one more level of indention and cyclomatic-complexity compare to simple if/else.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I just wrote it down for completeness. Anyway I just found it more idiomatic, but I don't actually have that much experience in Scala to say what is more idiomatic and what is not. 👍

log.warn("Document frequency table is empty.")
None
} else {
val df = row
.getMap("df", classOf[java.lang.String], classOf[java.lang.Integer])
var tokens = IndexedSeq[String]()
val df = conn
.execute(s"SELECT * FROM ${tables.featuresFreq} WHERE ${freqCols.id} = '$mode' ORDER BY ${freqCols.feature}")
.asScala
.mapValues(_.toInt)
.map { row =>
// tokens have to be sorted, df.keys isn't sorted

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you want them to be sorted? Alphabetical? Isn't it possible to sort outside the block? I'm thinking about something as:

      val df = conn
        .execute(s"SELECT * FROM ${tables.featuresFreq} WHERE ${freqCols.id} = '$mode'")
        .asScala
        .map(row => (row.getString(freqCols.feature), row.getInt(freqCols.weight)))
        .toMap
      Some(OrderedDocFreq(r.getInt(docsCols.docs), df.keys.toIndexedSeq.sorted, df))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alphabetical. DB already returns it in the correct order.
This map is quite big. On a small 1k repos dataset serialized version of this map took 45mb. I would prefer to avoid unnecessary sorting if it's possible.

Copy link

@se7entyse7en se7entyse7en Jan 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know cassandra actually, so correct me if I'm wrong.

Isn't relying on the db ordering without actually specifying it at query time a bit dangerous? Let me explain better. I guess that in the table there's some sort of index that is being used as ordering key, and we're using it implicitly here. But if we change something at db level we also need to remember to change the code. Isn't it better to also include the ordering in the query? I guess that it won't affect the execution time given that even without including it, the db still orders them using that ordering.

Copy link

@se7entyse7en se7entyse7en Jan 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If then we change something at db level everything should continue to work. It could just perform much slower (in case for example we remove the ordering key).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(mode, feature) is a primary key that's why it's sorted.

Thanks for pointing that I missed order by in SELECT! I'll add it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

val name = row.getString(freqCols.feature)
tokens = tokens :+ name

Some(OrderedDocFreq(row.getInt(cols.docs), df.keys.toIndexedSeq, df))
(name, row.getInt(freqCols.weight))
}.toMap

Some(OrderedDocFreq(docsRow.getInt(docsCols.docs), tokens, df))
}
}

Expand Down
24 changes: 16 additions & 8 deletions src/main/scala/tech/sourced/gemini/Gemini.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,28 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
}

def isDBEmpty(session: Session, mode: String): Boolean = {
var row = session.execute(s"select count(*) from $keyspace.${tables.docFreq} where id='$mode' limit 1").one()
if (row.getLong(0) > 0) {
var row = session.execute(s"select * from $keyspace.${tables.featuresDocs} where id='$mode' limit 1").one()
if (row != null) {
return false
}

row = session.execute(s"select count(*) from $keyspace.${tables.hashtables}_$mode").one()
if (row.getLong(0) > 0) {
row = session.execute(s"select * from $keyspace.${tables.featuresFreq} where id='$mode' limit 1").one()
if (row != null) {
return false
}

row = session.execute(s"select * from $keyspace.${tables.hashtables}_$mode limit 1").one()
if (row != null) {
return false
}

true
}

def cleanDB(session: Session, mode: String): Unit = {
session.execute(s"delete from $keyspace.${tables.docFreq} where id='$mode'")
session.execute(s"truncate table $keyspace.${tables.hashtables}_$mode")
session.execute(s"delete from $keyspace.${tables.featuresDocs} where id='$mode'")
session.execute(s"delete from $keyspace.${tables.featuresFreq} where id='$mode'")
session.execute(s"truncate table $keyspace.${tables.hashtables(mode)}")
}

def applySchema(session: Session): Unit = {
Expand Down Expand Up @@ -198,10 +204,12 @@ object Gemini {
val tables = Tables(
"meta",
"hashtables",
"docfreq",
"features_docs",
"features_freq",
MetaCols("sha1", "repo", "commit", "path"),
HashtablesCols("sha1", "hashtable", "value"),
DocFreqCols("id", "docs", "df")
FeaturesDocsCols("id", "docs"),
FeaturesFreqCols("id", "feature", "weight")
)

val formatter = new ObjectInserter.Formatter
Expand Down
20 changes: 13 additions & 7 deletions src/main/scala/tech/sourced/gemini/Hash.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,21 @@ class Hash(session: SparkSession,

protected def saveDocFreqToDB(docFreq: OrderedDocFreq, keyspace: String, tables: Tables): Unit = {
log.warn(s"save document frequencies to DB")
CassandraConnector(session.sparkContext).withSessionDo { cassandra =>
val cols = tables.docFreqCols
val javaMap = docFreq.df.asJava

CassandraConnector(session.sparkContext).withSessionDo { cassandra =>
val docsCols = tables.featuresDocsCols
cassandra.execute(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's a good idea to do execute on each row.
Maybe it's better to sc.parallelize(seq).saveToCassandra.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding this may better wait for someone with more insights than me on Spark. 😛

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We calculate docFreq on driver. sc.parallelize would split it and copy to workers so we can work with the collection in parallel. But I'm not sure saving to db would actually benefit much from it. Taking into account that copying isn't free also.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's already in the driver, can't we write in batches from there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly because Cassandra does not recommend it

But batches are often mistakenly used in an attempt to optimize performance. Depending on the batch operation, the performance may actually worsen. Some batch operations place a greater burden on the coordinator node and lessen the efficiency of the data insertion. The number of partitions involved in a batch operation, and thus the potential for multi-node accessing, can increase the latency dramatically. In all batching, the coordinator node manages all write operations, so that the coordinator node can pose a bottleneck to completion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I update code using prepared statement to make it a bit better.

s"INSERT INTO $keyspace.${tables.docFreq} (${cols.id}, ${cols.docs}, ${cols.df}) VALUES (?, ?, ?)",
mode, int2Integer(docFreq.docs), javaMap
s"INSERT INTO $keyspace.${tables.featuresDocs} (${docsCols.id}, ${docsCols.docs}) VALUES (?, ?)",
mode, int2Integer(docFreq.docs)
)

val freqCols = tables.featuresFreqCols
val prepared = cassandra.prepare(s"INSERT INTO $keyspace.${tables.featuresFreq}" +
s"(${freqCols.id}, ${freqCols.feature}, ${freqCols.weight}) VALUES (?, ?, ?)")

docFreq.df.foreach { case(feature, weight) =>
cassandra.execute(prepared.bind(mode, feature, int2Integer(weight)))
}
}
}

Expand Down Expand Up @@ -267,7 +274,6 @@ class Hash(session: SparkSession,
case Gemini.funcSimilarityMode => FeaturesHash.funcParams
}

val hashtablesTable = s"${tables.hashtables}_${mode}"
val cols = tables.hashtablesCols
rdd
.flatMap { case RDDHash(doc, wmh) =>
Expand All @@ -276,7 +282,7 @@ class Hash(session: SparkSession,
.toDF(cols.sha, cols.hashtable, cols.value)
.write
.mode("append")
.cassandraFormat(hashtablesTable, keyspace)
.cassandraFormat(tables.hashtables(mode), keyspace)
.save()
}

Expand Down
3 changes: 1 addition & 2 deletions src/main/scala/tech/sourced/gemini/Report.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ class Report(conn: Session, log: Slf4jLogger, keyspace: String, tables: Tables)
*/
def findConnectedComponents(mode: String): (Map[Int, Set[Int]], Map[Int, List[Int]], Map[String, Int]) = {
log.info(s"Finding ${mode} connected components")
val hashtablesTable = s"${tables.hashtables}_${mode}"
val cc = new DBConnectedComponents(log, conn, hashtablesTable, keyspace)
val cc = new DBConnectedComponents(log, conn, tables.hashtables(mode), keyspace)
val (buckets, elementIds) = cc.makeBuckets()
val elsToBuckets = cc.elementsToBuckets(buckets)

Expand Down
20 changes: 13 additions & 7 deletions src/test/scala/tech/sourced/gemini/BaseDBSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,30 @@ trait BaseDBSpec extends BeforeAndAfterAll {
}

def insertHashtables(items: Iterable[HashtableItem], mode: String): Unit = {
val hashtablesTable = s"${Gemini.tables.hashtables}_${mode}"
val cols = Gemini.tables.hashtablesCols
items.foreach { case HashtableItem(ht, v, sha1) =>
val cql = s"""INSERT INTO $keyspace.${hashtablesTable}
val cql = s"""INSERT INTO $keyspace.${Gemini.tables.hashtables(mode)}
(${cols.hashtable}, ${cols.value}, ${cols.sha})
VALUES ($ht, $v, '$sha1')"""
cassandra.execute(cql)
}
}

def insertDocFreq(docFreq: OrderedDocFreq, mode: String): Unit = {
val cols = Gemini.tables.docFreqCols
val javaMap = docFreq.df.asJava

val docsCols = Gemini.tables.featuresDocsCols
cassandra.execute(
s"INSERT INTO $keyspace.${Gemini.tables.docFreq} (${cols.id}, ${cols.docs}, ${cols.df}) VALUES (?, ?, ?)",
mode, int2Integer(docFreq.docs), javaMap
s"INSERT INTO $keyspace.${Gemini.tables.featuresDocs} (${docsCols.id}, ${docsCols.docs}) VALUES (?, ?)",
mode, int2Integer(docFreq.docs)
)

val freqCols = Gemini.tables.featuresFreqCols
docFreq.df.foreach { case(feature, weight) =>
cassandra.execute(
s"INSERT INTO $keyspace.${Gemini.tables.featuresFreq}" +
s"(${freqCols.id}, ${freqCols.feature}, ${freqCols.weight}) VALUES (?, ?, ?)",
mode, feature, int2Integer(weight)
)
}
}

override def afterAll(): Unit = {
Expand Down