From 8def3b819f150ad07948504f0b49c67240f94f40 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 8 Jun 2025 09:58:55 +0200 Subject: [PATCH] add support for the DuckDB and AWS Advanced Wrapper drivers in JDBC Adds support for the two drivers and includes a few additional tests for the wrapper, both with and withouth the Hikari pool. Also during testing I realized that we have to explicitly ignore some of the AWS wrapper classes to avoid having double spans for each database interaction. Funny enough, in that setup we have Hikari wrapping the AWS wrapper which wraps the actual wrapper --- build.sbt | 6 +- .../src/main/resources/reference.conf | 3 + .../jdbc/StatementInstrumentation.scala | 18 ++-- .../jdbc/StatementMonitor.scala | 4 +- .../StatementInstrumentationAdvisors.scala | 8 +- .../jdbc/HikariInstrumentationSpec.scala | 28 +++++- .../jdbc/StatementInstrumentationSpec.scala | 88 +++++++++++++++---- project/Build.scala | 2 +- 8 files changed, 127 insertions(+), 30 deletions(-) diff --git a/build.sbt b/build.sbt index 3776ec40d..034a614c8 100644 --- a/build.sbt +++ b/build.sbt @@ -309,8 +309,12 @@ lazy val `kamon-jdbc` = (project in file("instrumentation/kamon-jdbc")) logbackClassic % "test", "com.typesafe.slick" %% "slick-hikaricp" % slickVersion(scalaBinaryVersion.value) % "test", "com.h2database" % "h2" % "1.4.192" % "test", + "org.postgresql" % "postgresql" % "42.5.4" % "test", + "software.amazon.jdbc" % "aws-advanced-jdbc-wrapper" % "2.5.6" % "test", + "com.mysql" % "mysql-connector-j" % "9.3.0" % "test", "org.xerial" % "sqlite-jdbc" % "3.34.0" % "test", - "ch.vorburger.mariaDB4j" % "mariaDB4j" % "2.5.3" % "test" + "org.testcontainers" % "postgresql" % "1.19.8" % "test", + "org.testcontainers" % "mysql" % "1.19.8" % "test" ) ).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test") diff --git a/instrumentation/kamon-jdbc/src/main/resources/reference.conf b/instrumentation/kamon-jdbc/src/main/resources/reference.conf index 14679c37d..ff8169687 100644 --- a/instrumentation/kamon-jdbc/src/main/resources/reference.conf +++ b/instrumentation/kamon-jdbc/src/main/resources/reference.conf @@ -64,14 +64,17 @@ kanela.modules { "oracle.jdbc.", "com.amazon.redshift.jdbc42.", "com.amazon.redshift.core.jdbc42.", + "software.amazon.jdbc.", "com.mysql.jdbc.", "com.mysql.cj.jdbc.", + "org.duckdb.", "org.h2.Driver", "org.h2.jdbc.", "org.hsqldb.jdbc.", "net.sf.log4jdbc.", "org.mariadb.jdbc.", "org.postgresql.jdbc.", + "org.postgresql.Driver", "com.facebook.presto.jdbc.", "com.microsoft.sqlserver.jdbc.", "net.snowflake.client.jdbc.", diff --git a/instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/StatementInstrumentation.scala b/instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/StatementInstrumentation.scala index 22bf5763a..5f274910a 100644 --- a/instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/StatementInstrumentation.scala +++ b/instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/StatementInstrumentation.scala @@ -28,6 +28,9 @@ import kanela.agent.api.instrumentation.bridge.FieldBridge import kanela.agent.libs.net.bytebuddy.asm.Advice import scala.util.Try + +import kanela.agent.libs.net.bytebuddy.description.`type`.TypeDescription +import kanela.agent.libs.net.bytebuddy.matcher.ElementMatcher import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers._ import scala.annotation.static @@ -57,13 +60,16 @@ class StatementInstrumentation extends InstrumentationBuilder { .mixin(classOf[HasDatabaseTags.Mixin]) .mixin(classOf[HasConnectionPoolTelemetry.Mixin]) - onTypesMatching(hasSuperType(named("java.sql.Statement")).and(not(nameStartsWith("com.zaxxer.hikari")))) + private val ignoredDriverWrappers: ElementMatcher.Junction[TypeDescription] = + nameStartsWith("com.zaxxer.hikari").or(nameStartsWith("software.amazon.jdbc.wrapper")) + + onTypesMatching(hasSuperType(named("java.sql.Statement")).and(not(ignoredDriverWrappers))) .advise(method("execute").and(withOneStringArgument), classOf[StatementExecuteMethodAdvisor]) .advise(method("executeQuery").and(withOneStringArgument), classOf[StatementExecuteQueryMethodAdvisor]) .advise(method("executeUpdate").and(withOneStringArgument), classOf[StatementExecuteUpdateMethodAdvisor]) .advise(anyMethods("executeBatch", "executeLargeBatch"), classOf[StatementExecuteBatchMethodAdvisor]) - onTypesMatching(hasSuperType(named("java.sql.PreparedStatement")).and(not(nameStartsWith("com.zaxxer.hikari")))) + onTypesMatching(hasSuperType(named("java.sql.PreparedStatement")).and(not(ignoredDriverWrappers))) .advise(method("execute"), classOf[PreparedStatementExecuteMethodAdvisor]) .advise(method("executeQuery"), classOf[PreparedStatementExecuteQueryMethodAdvisor]) .advise(method("executeUpdate"), classOf[PreparedStatementExecuteUpdateMethodAdvisor]) @@ -98,7 +104,7 @@ class StatementInstrumentation extends InstrumentationBuilder { case class DatabaseTags(metricTags: TagSet, spanTags: TagSet) trait HasDatabaseTags { - def databaseTags(): DatabaseTags + def databaseTags: DatabaseTags def setDatabaseTags(databaseTags: DatabaseTags): Unit } @@ -111,7 +117,7 @@ object HasDatabaseTags { } trait HasStatementSQL { - def capturedStatementSQL(): String + def capturedStatementSQL: String def setStatementSQL(sql: String): Unit } @@ -157,7 +163,7 @@ object CreateStatementAdvice { @Advice.OnMethodExit @static def exit(@Advice.This connection: Any, @Advice.Return statement: Any): Unit = { - statement.asInstanceOf[HasDatabaseTags].setDatabaseTags(connection.asInstanceOf[HasDatabaseTags].databaseTags()) + statement.asInstanceOf[HasDatabaseTags].setDatabaseTags(connection.asInstanceOf[HasDatabaseTags].databaseTags) statement.asInstanceOf[HasConnectionPoolTelemetry].setConnectionPoolTelemetry( connection.asInstanceOf[HasConnectionPoolTelemetry].connectionPoolTelemetry ) @@ -173,7 +179,7 @@ object CreatePreparedStatementAdvice { @Advice.Argument(0) sql: String, @Advice.Return statement: Any ): Unit = { - statement.asInstanceOf[HasDatabaseTags].setDatabaseTags(connection.asInstanceOf[HasDatabaseTags].databaseTags()) + statement.asInstanceOf[HasDatabaseTags].setDatabaseTags(connection.asInstanceOf[HasDatabaseTags].databaseTags) statement.asInstanceOf[HasConnectionPoolTelemetry].setConnectionPoolTelemetry( statement.asInstanceOf[HasConnectionPoolTelemetry].connectionPoolTelemetry ) diff --git a/instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/StatementMonitor.scala b/instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/StatementMonitor.scala index 1d1fef0de..542a15fdd 100644 --- a/instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/StatementMonitor.scala +++ b/instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/StatementMonitor.scala @@ -92,8 +92,8 @@ object StatementMonitor extends LoggingSupport { val poolTelemetry = cpt.connectionPoolTelemetry.get() (poolTelemetry.instruments.inFlightStatements, poolTelemetry.databaseTags) - case dbt: HasDatabaseTags if dbt.databaseTags() != null => - (JdbcMetrics.InFlightStatements.withTags(dbt.databaseTags().metricTags), dbt.databaseTags()) + case dbt: HasDatabaseTags if dbt.databaseTags != null => + (JdbcMetrics.InFlightStatements.withTags(dbt.databaseTags.metricTags), dbt.databaseTags) case _ => (JdbcMetrics.InFlightStatements.withoutTags(), DatabaseTags(TagSet.Empty, TagSet.Empty)) diff --git a/instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/advisor/StatementInstrumentationAdvisors.scala b/instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/advisor/StatementInstrumentationAdvisors.scala index 627f524c6..e843a3fdd 100644 --- a/instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/advisor/StatementInstrumentationAdvisors.scala +++ b/instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/advisor/StatementInstrumentationAdvisors.scala @@ -47,7 +47,7 @@ class PreparedStatementExecuteMethodAdvisor object PreparedStatementExecuteMethodAdvisor { @Advice.OnMethodEnter(suppress = classOf[Throwable]) @static def executeStart(@Advice.This statement: HasStatementSQL): Option[Invocation] = { - StatementMonitor.start(statement, statement.capturedStatementSQL(), StatementTypes.GenericExecute) + StatementMonitor.start(statement, statement.capturedStatementSQL, StatementTypes.GenericExecute) } @Advice.OnMethodExit(onThrowable = classOf[Throwable], suppress = classOf[Throwable]) @@ -79,7 +79,7 @@ class PreparedStatementExecuteQueryMethodAdvisor object PreparedStatementExecuteQueryMethodAdvisor { @Advice.OnMethodEnter(suppress = classOf[Throwable]) @static def executeStart(@Advice.This statement: HasStatementSQL): Option[Invocation] = { - StatementMonitor.start(statement, statement.capturedStatementSQL(), StatementTypes.Query) + StatementMonitor.start(statement, statement.capturedStatementSQL, StatementTypes.Query) } @Advice.OnMethodExit(onThrowable = classOf[Throwable], suppress = classOf[Throwable]) @@ -111,7 +111,7 @@ class PreparedStatementExecuteUpdateMethodAdvisor object PreparedStatementExecuteUpdateMethodAdvisor { @Advice.OnMethodEnter(suppress = classOf[Throwable]) @static def executeStart(@Advice.This statement: HasStatementSQL): Option[Invocation] = { - StatementMonitor.start(statement, statement.capturedStatementSQL(), StatementTypes.Update) + StatementMonitor.start(statement, statement.capturedStatementSQL, StatementTypes.Update) } @Advice.OnMethodExit(onThrowable = classOf[Throwable], suppress = classOf[Throwable]) @@ -130,7 +130,7 @@ object StatementExecuteBatchMethodAdvisor { @Advice.OnMethodEnter(suppress = classOf[Throwable]) @static def executeStart(@Advice.This statement: Any): Option[Invocation] = { val statementSQL = statement match { - case hSQL: HasStatementSQL => hSQL.capturedStatementSQL() + case hSQL: HasStatementSQL => hSQL.capturedStatementSQL case _ => statement.toString } diff --git a/instrumentation/kamon-jdbc/src/test/scala/kamon/instrumentation/jdbc/HikariInstrumentationSpec.scala b/instrumentation/kamon-jdbc/src/test/scala/kamon/instrumentation/jdbc/HikariInstrumentationSpec.scala index 33036b7c0..64b248ff5 100644 --- a/instrumentation/kamon-jdbc/src/test/scala/kamon/instrumentation/jdbc/HikariInstrumentationSpec.scala +++ b/instrumentation/kamon-jdbc/src/test/scala/kamon/instrumentation/jdbc/HikariInstrumentationSpec.scala @@ -28,6 +28,7 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.time.SpanSugar import org.scalatest.wordspec.AnyWordSpec import org.scalatest.{BeforeAndAfterEach, OptionValues} +import org.testcontainers.containers.PostgreSQLContainer import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} @@ -42,7 +43,7 @@ class HikariInstrumentationSpec extends AnyWordSpec with InitAndStopKamonAfterAll with OptionValues { - import HikariInstrumentationSpec.{createH2Pool, createSQLitePool} + import HikariInstrumentationSpec.{createH2Pool, createSQLitePool, createAwsWrapperPool} implicit val parallelQueriesContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16)) @@ -55,6 +56,7 @@ class HikariInstrumentationSpec extends AnyWordSpec ) "the Hikari instrumentation" should { + "track each hikari pool using the pool name as tag and cleanup after closing the pool" in { val pool1 = createH2Pool("example-1") val pool2 = createH2Pool("example-2") @@ -209,6 +211,20 @@ class HikariInstrumentationSpec extends AnyWordSpec connection.close() pool.close() } + + "construct pools with the AWS wrapper" in { + val postgresqlContainer = new PostgreSQLContainer("postgres:16") + postgresqlContainer.start() + val url = postgresqlContainer + .getJdbcUrl() + .replace("postgresql", "aws-wrapper:postgresql") + "&user=test&password=test" + + val pool = createAwsWrapperPool("aws-wrapper", url) + val connection = pool.getConnection() + connection.close() + pool.close() + postgresqlContainer.stop() + } } } @@ -244,6 +260,16 @@ object HikariInstrumentationSpec { hikariPool } + def createAwsWrapperPool(name: String, url: String, size: Int = 1): HikariDataSource = { + System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "200") + + val config = basicConfig(name, size) + config.setJdbcUrl(url) + + val hikariPool = new HikariDataSource(config) + hikariPool + } + private def basicConfig(name: String, size: Int): HikariConfig = { val config = new HikariConfig() config.setConnectionInitSql("SELECT 1;") diff --git a/instrumentation/kamon-jdbc/src/test/scala/kamon/instrumentation/jdbc/StatementInstrumentationSpec.scala b/instrumentation/kamon-jdbc/src/test/scala/kamon/instrumentation/jdbc/StatementInstrumentationSpec.scala index 7a4b63d8f..5de42252f 100644 --- a/instrumentation/kamon-jdbc/src/test/scala/kamon/instrumentation/jdbc/StatementInstrumentationSpec.scala +++ b/instrumentation/kamon-jdbc/src/test/scala/kamon/instrumentation/jdbc/StatementInstrumentationSpec.scala @@ -15,7 +15,6 @@ package kamon.instrumentation.jdbc -import ch.vorburger.mariadb4j.DB import com.typesafe.config.ConfigFactory import kamon.Kamon import kamon.tag.Lookups._ @@ -28,6 +27,8 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.time.SpanSugar import org.scalatest.wordspec.AnyWordSpec import org.scalatest.{BeforeAndAfterEach, OptionValues} +import org.testcontainers.containers.PostgreSQLContainer +import org.testcontainers.containers.MySQLContainer import java.sql.{Connection, DriverManager, ResultSet} import java.time.Duration @@ -65,11 +66,13 @@ class StatementInstrumentationSpec extends AnyWordSpec override implicit def patienceConfig: PatienceConfig = PatienceConfig(timeout = scaled(2 seconds)) val drivers = Seq( + DriverSuite.Postgres, DriverSuite.H2, DriverSuite.SQLite, DriverSuite.MySQL, - DriverSuite.HikariH2 - ).filter(canRunInCurrentEnvironment) + DriverSuite.HikariH2, + DriverSuite.PostgresWithAWSAdvancedDriver + ) "the StatementInstrumentation" when { drivers.foreach { driver => @@ -265,7 +268,7 @@ class StatementInstrumentationSpec extends AnyWordSpec if (driver.supportSleeping) { val vendorTags = TagSet.of("db.vendor", driver.vendor) - for (id ← 1 to 10) yield { + for (id <- 1 to 10) yield { Future { val connection = driver.connect() driver.sleep(connection, Duration.ofMillis(1500)) @@ -437,12 +440,13 @@ class StatementInstrumentationSpec extends AnyWordSpec object MySQL extends DriverSuite with AddressTableSetup { val name = "MySQL" val vendor = "mysql" - val url = "jdbc:mysql://localhost/test" val supportSleeping = false + lazy val mysqlContainer = new MySQLContainer("mysql:8.4.5") + lazy val url = mysqlContainer.getJdbcUrl() + "?user=test&password=test" lazy val connection = DriverManager.getConnection(url) override def init(): Unit = { - DB.newEmbeddedDB(3306).start() + mysqlContainer.start() initializeAddressTable(connect()) } @@ -450,22 +454,76 @@ class StatementInstrumentationSpec extends AnyWordSpec override def sleep(connection: Connection, duration: Duration): Unit = {} - override def cleanup() = connection.close() + override def cleanup() = { + connection.close() + mysqlContainer.stop() + } + } + + object Postgres extends DriverSuite with AddressTableSetup { + val name = "Postgres" + val vendor = "postgresql" + val supportSleeping = false + lazy val postgresqlContainer = new PostgreSQLContainer("postgres:16") + lazy val url = postgresqlContainer.getJdbcUrl() + "&user=test&password=test" + lazy val connection = DriverManager.getConnection(url) + + override def init(): Unit = { + postgresqlContainer.start() + initializeAddressTable(connect()) + } + + override def connect(): Connection = connection + + override def sleep(connection: Connection, duration: Duration): Unit = {} + + override def cleanup() = { + connection.close() + postgresqlContainer.stop() + } + } + + object PostgresWithAWSAdvancedDriver extends DriverSuite with AddressTableSetup { + val name = "Postgres with AWS Advanced Driver" + val vendor = "postgresql" + val supportSleeping = false + + // It seems like the AWS wrapper will remove all parameters we pass on the + // connection string and turn them into properties before passing them to + // the underlying driver, so we need to cleanup the URL for all assertions + // to work as epected. + val connectionProperties = new java.util.Properties() + connectionProperties.setProperty("user", "test") + connectionProperties.setProperty("password", "test") + lazy val postgresqlContainer = new PostgreSQLContainer("postgres:16") + lazy val url = postgresqlContainer.getJdbcUrl().replace("?loggerLevel=OFF", "") + lazy val urlForConnection = url.replace("postgresql", "aws-wrapper:postgresql") + lazy val connection = DriverManager.getConnection(urlForConnection, connectionProperties) + + override def init(): Unit = { + postgresqlContainer.start() + initializeAddressTable(connect()) + } + + override def connect(): Connection = connection + + override def sleep(connection: Connection, duration: Duration): Unit = {} + + override def cleanup() = { + connection.close() + postgresqlContainer.stop() + } } trait AddressTableSetup { def initializeAddressTable(connection: Connection): Unit = { connection.createStatement().executeUpdate("DROP TABLE IF EXISTS Address;") connection.createStatement().executeUpdate("CREATE TABLE Address (Nr INTEGER, Name VARCHAR(128));") - connection.createStatement().executeUpdate(s"INSERT INTO Address (Nr, Name) VALUES(1, 'foo')") - connection.createStatement().executeUpdate(s"INSERT INTO Address (Nr, Name) VALUES(2, 'foo')") - connection.createStatement().executeUpdate(s"INSERT INTO Address (Nr, Name) VALUES(3, 'foo')") - connection.createStatement().executeUpdate(s"INSERT INTO Address (Nr, Name) VALUES(4, 'foo')") + connection.createStatement().executeUpdate("INSERT INTO Address (Nr, Name) VALUES(1, 'foo')") + connection.createStatement().executeUpdate("INSERT INTO Address (Nr, Name) VALUES(2, 'foo')") + connection.createStatement().executeUpdate("INSERT INTO Address (Nr, Name) VALUES(3, 'foo')") + connection.createStatement().executeUpdate("INSERT INTO Address (Nr, Name) VALUES(4, 'foo')") } } } - - def canRunInCurrentEnvironment(driverSuite: DriverSuite): Boolean = { - !(System.getenv("TRAVIS") != null && driverSuite.vendor == "mysql") - } } diff --git a/project/Build.scala b/project/Build.scala index 8c124bc13..3d0d6c980 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -35,7 +35,7 @@ object BaseProject extends AutoPlugin { /** Marker configuration for dependencies that will be shaded into their module's jar. */ lazy val Shaded = config("shaded").hide - val kanelaAgent = "io.kamon" % "kanela-agent" % "2.0.0-beta.1" + val kanelaAgent = "io.kamon" % "kanela-agent" % "2.0.0-beta.3" val slf4jApi = "org.slf4j" % "slf4j-api" % "2.0.17" val slf4jnop = "org.slf4j" % "slf4j-nop" % "2.0.17" val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.3.15"