Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,12 @@ lazy val `kamon-jdbc` = (project in file("instrumentation/kamon-jdbc"))
logbackClassic % "test",
"com.typesafe.slick" %% "slick-hikaricp" % slickVersion(scalaBinaryVersion.value) % "test",
"com.h2database" % "h2" % "1.4.192" % "test",
"org.postgresql" % "postgresql" % "42.5.4" % "test",
"software.amazon.jdbc" % "aws-advanced-jdbc-wrapper" % "2.5.6" % "test",
"com.mysql" % "mysql-connector-j" % "9.3.0" % "test",
"org.xerial" % "sqlite-jdbc" % "3.34.0" % "test",
"ch.vorburger.mariaDB4j" % "mariaDB4j" % "2.5.3" % "test"
"org.testcontainers" % "postgresql" % "1.19.8" % "test",
"org.testcontainers" % "mysql" % "1.19.8" % "test"
)
).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")

Expand Down
3 changes: 3 additions & 0 deletions instrumentation/kamon-jdbc/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,17 @@ kanela.modules {
"oracle.jdbc.",
"com.amazon.redshift.jdbc42.",
"com.amazon.redshift.core.jdbc42.",
"software.amazon.jdbc.",
"com.mysql.jdbc.",
"com.mysql.cj.jdbc.",
"org.duckdb.",
"org.h2.Driver",
"org.h2.jdbc.",
"org.hsqldb.jdbc.",
"net.sf.log4jdbc.",
"org.mariadb.jdbc.",
"org.postgresql.jdbc.",
"org.postgresql.Driver",
"com.facebook.presto.jdbc.",
"com.microsoft.sqlserver.jdbc.",
"net.snowflake.client.jdbc.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import kanela.agent.api.instrumentation.bridge.FieldBridge
import kanela.agent.libs.net.bytebuddy.asm.Advice

import scala.util.Try

import kanela.agent.libs.net.bytebuddy.description.`type`.TypeDescription
import kanela.agent.libs.net.bytebuddy.matcher.ElementMatcher
import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers._

import scala.annotation.static
Expand Down Expand Up @@ -57,13 +60,16 @@ class StatementInstrumentation extends InstrumentationBuilder {
.mixin(classOf[HasDatabaseTags.Mixin])
.mixin(classOf[HasConnectionPoolTelemetry.Mixin])

onTypesMatching(hasSuperType(named("java.sql.Statement")).and(not(nameStartsWith("com.zaxxer.hikari"))))
private val ignoredDriverWrappers: ElementMatcher.Junction[TypeDescription] =
nameStartsWith("com.zaxxer.hikari").or(nameStartsWith("software.amazon.jdbc.wrapper"))

onTypesMatching(hasSuperType(named("java.sql.Statement")).and(not(ignoredDriverWrappers)))
.advise(method("execute").and(withOneStringArgument), classOf[StatementExecuteMethodAdvisor])
.advise(method("executeQuery").and(withOneStringArgument), classOf[StatementExecuteQueryMethodAdvisor])
.advise(method("executeUpdate").and(withOneStringArgument), classOf[StatementExecuteUpdateMethodAdvisor])
.advise(anyMethods("executeBatch", "executeLargeBatch"), classOf[StatementExecuteBatchMethodAdvisor])

onTypesMatching(hasSuperType(named("java.sql.PreparedStatement")).and(not(nameStartsWith("com.zaxxer.hikari"))))
onTypesMatching(hasSuperType(named("java.sql.PreparedStatement")).and(not(ignoredDriverWrappers)))
.advise(method("execute"), classOf[PreparedStatementExecuteMethodAdvisor])
.advise(method("executeQuery"), classOf[PreparedStatementExecuteQueryMethodAdvisor])
.advise(method("executeUpdate"), classOf[PreparedStatementExecuteUpdateMethodAdvisor])
Expand Down Expand Up @@ -98,7 +104,7 @@ class StatementInstrumentation extends InstrumentationBuilder {
case class DatabaseTags(metricTags: TagSet, spanTags: TagSet)

trait HasDatabaseTags {
def databaseTags(): DatabaseTags
def databaseTags: DatabaseTags
def setDatabaseTags(databaseTags: DatabaseTags): Unit
}

Expand All @@ -111,7 +117,7 @@ object HasDatabaseTags {
}

trait HasStatementSQL {
def capturedStatementSQL(): String
def capturedStatementSQL: String
def setStatementSQL(sql: String): Unit
}

Expand Down Expand Up @@ -157,7 +163,7 @@ object CreateStatementAdvice {

@Advice.OnMethodExit
@static def exit(@Advice.This connection: Any, @Advice.Return statement: Any): Unit = {
statement.asInstanceOf[HasDatabaseTags].setDatabaseTags(connection.asInstanceOf[HasDatabaseTags].databaseTags())
statement.asInstanceOf[HasDatabaseTags].setDatabaseTags(connection.asInstanceOf[HasDatabaseTags].databaseTags)
statement.asInstanceOf[HasConnectionPoolTelemetry].setConnectionPoolTelemetry(
connection.asInstanceOf[HasConnectionPoolTelemetry].connectionPoolTelemetry
)
Expand All @@ -173,7 +179,7 @@ object CreatePreparedStatementAdvice {
@Advice.Argument(0) sql: String,
@Advice.Return statement: Any
): Unit = {
statement.asInstanceOf[HasDatabaseTags].setDatabaseTags(connection.asInstanceOf[HasDatabaseTags].databaseTags())
statement.asInstanceOf[HasDatabaseTags].setDatabaseTags(connection.asInstanceOf[HasDatabaseTags].databaseTags)
statement.asInstanceOf[HasConnectionPoolTelemetry].setConnectionPoolTelemetry(
statement.asInstanceOf[HasConnectionPoolTelemetry].connectionPoolTelemetry
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ object StatementMonitor extends LoggingSupport {
val poolTelemetry = cpt.connectionPoolTelemetry.get()
(poolTelemetry.instruments.inFlightStatements, poolTelemetry.databaseTags)

case dbt: HasDatabaseTags if dbt.databaseTags() != null =>
(JdbcMetrics.InFlightStatements.withTags(dbt.databaseTags().metricTags), dbt.databaseTags())
case dbt: HasDatabaseTags if dbt.databaseTags != null =>
(JdbcMetrics.InFlightStatements.withTags(dbt.databaseTags.metricTags), dbt.databaseTags)

case _ =>
(JdbcMetrics.InFlightStatements.withoutTags(), DatabaseTags(TagSet.Empty, TagSet.Empty))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class PreparedStatementExecuteMethodAdvisor
object PreparedStatementExecuteMethodAdvisor {
@Advice.OnMethodEnter(suppress = classOf[Throwable])
@static def executeStart(@Advice.This statement: HasStatementSQL): Option[Invocation] = {
StatementMonitor.start(statement, statement.capturedStatementSQL(), StatementTypes.GenericExecute)
StatementMonitor.start(statement, statement.capturedStatementSQL, StatementTypes.GenericExecute)
}

@Advice.OnMethodExit(onThrowable = classOf[Throwable], suppress = classOf[Throwable])
Expand Down Expand Up @@ -79,7 +79,7 @@ class PreparedStatementExecuteQueryMethodAdvisor
object PreparedStatementExecuteQueryMethodAdvisor {
@Advice.OnMethodEnter(suppress = classOf[Throwable])
@static def executeStart(@Advice.This statement: HasStatementSQL): Option[Invocation] = {
StatementMonitor.start(statement, statement.capturedStatementSQL(), StatementTypes.Query)
StatementMonitor.start(statement, statement.capturedStatementSQL, StatementTypes.Query)
}

@Advice.OnMethodExit(onThrowable = classOf[Throwable], suppress = classOf[Throwable])
Expand Down Expand Up @@ -111,7 +111,7 @@ class PreparedStatementExecuteUpdateMethodAdvisor
object PreparedStatementExecuteUpdateMethodAdvisor {
@Advice.OnMethodEnter(suppress = classOf[Throwable])
@static def executeStart(@Advice.This statement: HasStatementSQL): Option[Invocation] = {
StatementMonitor.start(statement, statement.capturedStatementSQL(), StatementTypes.Update)
StatementMonitor.start(statement, statement.capturedStatementSQL, StatementTypes.Update)
}

@Advice.OnMethodExit(onThrowable = classOf[Throwable], suppress = classOf[Throwable])
Expand All @@ -130,7 +130,7 @@ object StatementExecuteBatchMethodAdvisor {
@Advice.OnMethodEnter(suppress = classOf[Throwable])
@static def executeStart(@Advice.This statement: Any): Option[Invocation] = {
val statementSQL = statement match {
case hSQL: HasStatementSQL => hSQL.capturedStatementSQL()
case hSQL: HasStatementSQL => hSQL.capturedStatementSQL
case _ => statement.toString
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.time.SpanSugar
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.{BeforeAndAfterEach, OptionValues}
import org.testcontainers.containers.PostgreSQLContainer

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}

Expand All @@ -42,7 +43,7 @@ class HikariInstrumentationSpec extends AnyWordSpec
with InitAndStopKamonAfterAll
with OptionValues {

import HikariInstrumentationSpec.{createH2Pool, createSQLitePool}
import HikariInstrumentationSpec.{createH2Pool, createSQLitePool, createAwsWrapperPool}
implicit val parallelQueriesContext: ExecutionContextExecutor =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))

Expand All @@ -55,6 +56,7 @@ class HikariInstrumentationSpec extends AnyWordSpec
)

"the Hikari instrumentation" should {

"track each hikari pool using the pool name as tag and cleanup after closing the pool" in {
val pool1 = createH2Pool("example-1")
val pool2 = createH2Pool("example-2")
Expand Down Expand Up @@ -209,6 +211,20 @@ class HikariInstrumentationSpec extends AnyWordSpec
connection.close()
pool.close()
}

"construct pools with the AWS wrapper" in {
val postgresqlContainer = new PostgreSQLContainer("postgres:16")
postgresqlContainer.start()
val url = postgresqlContainer
.getJdbcUrl()
.replace("postgresql", "aws-wrapper:postgresql") + "&user=test&password=test"

val pool = createAwsWrapperPool("aws-wrapper", url)
val connection = pool.getConnection()
connection.close()
pool.close()
postgresqlContainer.stop()
}
}
}

Expand Down Expand Up @@ -244,6 +260,16 @@ object HikariInstrumentationSpec {
hikariPool
}

def createAwsWrapperPool(name: String, url: String, size: Int = 1): HikariDataSource = {
System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "200")

val config = basicConfig(name, size)
config.setJdbcUrl(url)

val hikariPool = new HikariDataSource(config)
hikariPool
}

private def basicConfig(name: String, size: Int): HikariConfig = {
val config = new HikariConfig()
config.setConnectionInitSql("SELECT 1;")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package kamon.instrumentation.jdbc

import ch.vorburger.mariadb4j.DB
import com.typesafe.config.ConfigFactory
import kamon.Kamon
import kamon.tag.Lookups._
Expand All @@ -28,6 +27,8 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.time.SpanSugar
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.{BeforeAndAfterEach, OptionValues}
import org.testcontainers.containers.PostgreSQLContainer
import org.testcontainers.containers.MySQLContainer

import java.sql.{Connection, DriverManager, ResultSet}
import java.time.Duration
Expand Down Expand Up @@ -65,11 +66,13 @@ class StatementInstrumentationSpec extends AnyWordSpec
override implicit def patienceConfig: PatienceConfig = PatienceConfig(timeout = scaled(2 seconds))

val drivers = Seq(
DriverSuite.Postgres,
DriverSuite.H2,
DriverSuite.SQLite,
DriverSuite.MySQL,
DriverSuite.HikariH2
).filter(canRunInCurrentEnvironment)
DriverSuite.HikariH2,
DriverSuite.PostgresWithAWSAdvancedDriver
)

"the StatementInstrumentation" when {
drivers.foreach { driver =>
Expand Down Expand Up @@ -265,7 +268,7 @@ class StatementInstrumentationSpec extends AnyWordSpec
if (driver.supportSleeping) {
val vendorTags = TagSet.of("db.vendor", driver.vendor)

for (id 1 to 10) yield {
for (id <- 1 to 10) yield {
Future {
val connection = driver.connect()
driver.sleep(connection, Duration.ofMillis(1500))
Expand Down Expand Up @@ -437,35 +440,90 @@ class StatementInstrumentationSpec extends AnyWordSpec
object MySQL extends DriverSuite with AddressTableSetup {
val name = "MySQL"
val vendor = "mysql"
val url = "jdbc:mysql://localhost/test"
val supportSleeping = false
lazy val mysqlContainer = new MySQLContainer("mysql:8.4.5")
lazy val url = mysqlContainer.getJdbcUrl() + "?user=test&password=test"
lazy val connection = DriverManager.getConnection(url)

override def init(): Unit = {
DB.newEmbeddedDB(3306).start()
mysqlContainer.start()
initializeAddressTable(connect())
}

override def connect(): Connection = connection

override def sleep(connection: Connection, duration: Duration): Unit = {}

override def cleanup() = connection.close()
override def cleanup() = {
connection.close()
mysqlContainer.stop()
}
}

object Postgres extends DriverSuite with AddressTableSetup {
val name = "Postgres"
val vendor = "postgresql"
val supportSleeping = false
lazy val postgresqlContainer = new PostgreSQLContainer("postgres:16")
lazy val url = postgresqlContainer.getJdbcUrl() + "&user=test&password=test"
lazy val connection = DriverManager.getConnection(url)

override def init(): Unit = {
postgresqlContainer.start()
initializeAddressTable(connect())
}

override def connect(): Connection = connection

override def sleep(connection: Connection, duration: Duration): Unit = {}

override def cleanup() = {
connection.close()
postgresqlContainer.stop()
}
}

object PostgresWithAWSAdvancedDriver extends DriverSuite with AddressTableSetup {
val name = "Postgres with AWS Advanced Driver"
val vendor = "postgresql"
val supportSleeping = false

// It seems like the AWS wrapper will remove all parameters we pass on the
// connection string and turn them into properties before passing them to
// the underlying driver, so we need to cleanup the URL for all assertions
// to work as epected.
val connectionProperties = new java.util.Properties()
connectionProperties.setProperty("user", "test")
connectionProperties.setProperty("password", "test")
lazy val postgresqlContainer = new PostgreSQLContainer("postgres:16")
lazy val url = postgresqlContainer.getJdbcUrl().replace("?loggerLevel=OFF", "")
lazy val urlForConnection = url.replace("postgresql", "aws-wrapper:postgresql")
lazy val connection = DriverManager.getConnection(urlForConnection, connectionProperties)

override def init(): Unit = {
postgresqlContainer.start()
initializeAddressTable(connect())
}

override def connect(): Connection = connection

override def sleep(connection: Connection, duration: Duration): Unit = {}

override def cleanup() = {
connection.close()
postgresqlContainer.stop()
}
}

trait AddressTableSetup {
def initializeAddressTable(connection: Connection): Unit = {
connection.createStatement().executeUpdate("DROP TABLE IF EXISTS Address;")
connection.createStatement().executeUpdate("CREATE TABLE Address (Nr INTEGER, Name VARCHAR(128));")
connection.createStatement().executeUpdate(s"INSERT INTO Address (Nr, Name) VALUES(1, 'foo')")
connection.createStatement().executeUpdate(s"INSERT INTO Address (Nr, Name) VALUES(2, 'foo')")
connection.createStatement().executeUpdate(s"INSERT INTO Address (Nr, Name) VALUES(3, 'foo')")
connection.createStatement().executeUpdate(s"INSERT INTO Address (Nr, Name) VALUES(4, 'foo')")
connection.createStatement().executeUpdate("INSERT INTO Address (Nr, Name) VALUES(1, 'foo')")
connection.createStatement().executeUpdate("INSERT INTO Address (Nr, Name) VALUES(2, 'foo')")
connection.createStatement().executeUpdate("INSERT INTO Address (Nr, Name) VALUES(3, 'foo')")
connection.createStatement().executeUpdate("INSERT INTO Address (Nr, Name) VALUES(4, 'foo')")
}
}
}

def canRunInCurrentEnvironment(driverSuite: DriverSuite): Boolean = {
!(System.getenv("TRAVIS") != null && driverSuite.vendor == "mysql")
}
}
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object BaseProject extends AutoPlugin {
/** Marker configuration for dependencies that will be shaded into their module's jar. */
lazy val Shaded = config("shaded").hide

val kanelaAgent = "io.kamon" % "kanela-agent" % "2.0.0-beta.1"
val kanelaAgent = "io.kamon" % "kanela-agent" % "2.0.0-beta.3"
val slf4jApi = "org.slf4j" % "slf4j-api" % "2.0.17"
val slf4jnop = "org.slf4j" % "slf4j-nop" % "2.0.17"
val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.3.15"
Expand Down