Skip to content

Commit 8f45f7b

Browse files
authored
add support for the DuckDB and AWS Advanced Wrapper drivers in JDBC (#1401)
Adds support for the two drivers and includes a few additional tests for the wrapper, both with and withouth the Hikari pool. Also during testing I realized that we have to explicitly ignore some of the AWS wrapper classes to avoid having double spans for each database interaction. Funny enough, in that setup we have Hikari wrapping the AWS wrapper which wraps the actual wrapper
1 parent 1a4efce commit 8f45f7b

File tree

8 files changed

+127
-30
lines changed

8 files changed

+127
-30
lines changed

build.sbt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,12 @@ lazy val `kamon-jdbc` = (project in file("instrumentation/kamon-jdbc"))
309309
logbackClassic % "test",
310310
"com.typesafe.slick" %% "slick-hikaricp" % slickVersion(scalaBinaryVersion.value) % "test",
311311
"com.h2database" % "h2" % "1.4.192" % "test",
312+
"org.postgresql" % "postgresql" % "42.5.4" % "test",
313+
"software.amazon.jdbc" % "aws-advanced-jdbc-wrapper" % "2.5.6" % "test",
314+
"com.mysql" % "mysql-connector-j" % "9.3.0" % "test",
312315
"org.xerial" % "sqlite-jdbc" % "3.34.0" % "test",
313-
"ch.vorburger.mariaDB4j" % "mariaDB4j" % "2.5.3" % "test"
316+
"org.testcontainers" % "postgresql" % "1.19.8" % "test",
317+
"org.testcontainers" % "mysql" % "1.19.8" % "test"
314318
)
315319
).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")
316320

instrumentation/kamon-jdbc/src/main/resources/reference.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,17 @@ kanela.modules {
6464
"oracle.jdbc.",
6565
"com.amazon.redshift.jdbc42.",
6666
"com.amazon.redshift.core.jdbc42.",
67+
"software.amazon.jdbc.",
6768
"com.mysql.jdbc.",
6869
"com.mysql.cj.jdbc.",
70+
"org.duckdb.",
6971
"org.h2.Driver",
7072
"org.h2.jdbc.",
7173
"org.hsqldb.jdbc.",
7274
"net.sf.log4jdbc.",
7375
"org.mariadb.jdbc.",
7476
"org.postgresql.jdbc.",
77+
"org.postgresql.Driver",
7578
"com.facebook.presto.jdbc.",
7679
"com.microsoft.sqlserver.jdbc.",
7780
"net.snowflake.client.jdbc.",

instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/StatementInstrumentation.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ import kanela.agent.api.instrumentation.bridge.FieldBridge
2828
import kanela.agent.libs.net.bytebuddy.asm.Advice
2929

3030
import scala.util.Try
31+
32+
import kanela.agent.libs.net.bytebuddy.description.`type`.TypeDescription
33+
import kanela.agent.libs.net.bytebuddy.matcher.ElementMatcher
3134
import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers._
3235

3336
import scala.annotation.static
@@ -57,13 +60,16 @@ class StatementInstrumentation extends InstrumentationBuilder {
5760
.mixin(classOf[HasDatabaseTags.Mixin])
5861
.mixin(classOf[HasConnectionPoolTelemetry.Mixin])
5962

60-
onTypesMatching(hasSuperType(named("java.sql.Statement")).and(not(nameStartsWith("com.zaxxer.hikari"))))
63+
private val ignoredDriverWrappers: ElementMatcher.Junction[TypeDescription] =
64+
nameStartsWith("com.zaxxer.hikari").or(nameStartsWith("software.amazon.jdbc.wrapper"))
65+
66+
onTypesMatching(hasSuperType(named("java.sql.Statement")).and(not(ignoredDriverWrappers)))
6167
.advise(method("execute").and(withOneStringArgument), classOf[StatementExecuteMethodAdvisor])
6268
.advise(method("executeQuery").and(withOneStringArgument), classOf[StatementExecuteQueryMethodAdvisor])
6369
.advise(method("executeUpdate").and(withOneStringArgument), classOf[StatementExecuteUpdateMethodAdvisor])
6470
.advise(anyMethods("executeBatch", "executeLargeBatch"), classOf[StatementExecuteBatchMethodAdvisor])
6571

66-
onTypesMatching(hasSuperType(named("java.sql.PreparedStatement")).and(not(nameStartsWith("com.zaxxer.hikari"))))
72+
onTypesMatching(hasSuperType(named("java.sql.PreparedStatement")).and(not(ignoredDriverWrappers)))
6773
.advise(method("execute"), classOf[PreparedStatementExecuteMethodAdvisor])
6874
.advise(method("executeQuery"), classOf[PreparedStatementExecuteQueryMethodAdvisor])
6975
.advise(method("executeUpdate"), classOf[PreparedStatementExecuteUpdateMethodAdvisor])
@@ -98,7 +104,7 @@ class StatementInstrumentation extends InstrumentationBuilder {
98104
case class DatabaseTags(metricTags: TagSet, spanTags: TagSet)
99105

100106
trait HasDatabaseTags {
101-
def databaseTags(): DatabaseTags
107+
def databaseTags: DatabaseTags
102108
def setDatabaseTags(databaseTags: DatabaseTags): Unit
103109
}
104110

@@ -111,7 +117,7 @@ object HasDatabaseTags {
111117
}
112118

113119
trait HasStatementSQL {
114-
def capturedStatementSQL(): String
120+
def capturedStatementSQL: String
115121
def setStatementSQL(sql: String): Unit
116122
}
117123

@@ -157,7 +163,7 @@ object CreateStatementAdvice {
157163

158164
@Advice.OnMethodExit
159165
@static def exit(@Advice.This connection: Any, @Advice.Return statement: Any): Unit = {
160-
statement.asInstanceOf[HasDatabaseTags].setDatabaseTags(connection.asInstanceOf[HasDatabaseTags].databaseTags())
166+
statement.asInstanceOf[HasDatabaseTags].setDatabaseTags(connection.asInstanceOf[HasDatabaseTags].databaseTags)
161167
statement.asInstanceOf[HasConnectionPoolTelemetry].setConnectionPoolTelemetry(
162168
connection.asInstanceOf[HasConnectionPoolTelemetry].connectionPoolTelemetry
163169
)
@@ -173,7 +179,7 @@ object CreatePreparedStatementAdvice {
173179
@Advice.Argument(0) sql: String,
174180
@Advice.Return statement: Any
175181
): Unit = {
176-
statement.asInstanceOf[HasDatabaseTags].setDatabaseTags(connection.asInstanceOf[HasDatabaseTags].databaseTags())
182+
statement.asInstanceOf[HasDatabaseTags].setDatabaseTags(connection.asInstanceOf[HasDatabaseTags].databaseTags)
177183
statement.asInstanceOf[HasConnectionPoolTelemetry].setConnectionPoolTelemetry(
178184
statement.asInstanceOf[HasConnectionPoolTelemetry].connectionPoolTelemetry
179185
)

instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/StatementMonitor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ object StatementMonitor extends LoggingSupport {
9292
val poolTelemetry = cpt.connectionPoolTelemetry.get()
9393
(poolTelemetry.instruments.inFlightStatements, poolTelemetry.databaseTags)
9494

95-
case dbt: HasDatabaseTags if dbt.databaseTags() != null =>
96-
(JdbcMetrics.InFlightStatements.withTags(dbt.databaseTags().metricTags), dbt.databaseTags())
95+
case dbt: HasDatabaseTags if dbt.databaseTags != null =>
96+
(JdbcMetrics.InFlightStatements.withTags(dbt.databaseTags.metricTags), dbt.databaseTags)
9797

9898
case _ =>
9999
(JdbcMetrics.InFlightStatements.withoutTags(), DatabaseTags(TagSet.Empty, TagSet.Empty))

instrumentation/kamon-jdbc/src/main/scala/kamon/instrumentation/jdbc/advisor/StatementInstrumentationAdvisors.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class PreparedStatementExecuteMethodAdvisor
4747
object PreparedStatementExecuteMethodAdvisor {
4848
@Advice.OnMethodEnter(suppress = classOf[Throwable])
4949
@static def executeStart(@Advice.This statement: HasStatementSQL): Option[Invocation] = {
50-
StatementMonitor.start(statement, statement.capturedStatementSQL(), StatementTypes.GenericExecute)
50+
StatementMonitor.start(statement, statement.capturedStatementSQL, StatementTypes.GenericExecute)
5151
}
5252

5353
@Advice.OnMethodExit(onThrowable = classOf[Throwable], suppress = classOf[Throwable])
@@ -79,7 +79,7 @@ class PreparedStatementExecuteQueryMethodAdvisor
7979
object PreparedStatementExecuteQueryMethodAdvisor {
8080
@Advice.OnMethodEnter(suppress = classOf[Throwable])
8181
@static def executeStart(@Advice.This statement: HasStatementSQL): Option[Invocation] = {
82-
StatementMonitor.start(statement, statement.capturedStatementSQL(), StatementTypes.Query)
82+
StatementMonitor.start(statement, statement.capturedStatementSQL, StatementTypes.Query)
8383
}
8484

8585
@Advice.OnMethodExit(onThrowable = classOf[Throwable], suppress = classOf[Throwable])
@@ -111,7 +111,7 @@ class PreparedStatementExecuteUpdateMethodAdvisor
111111
object PreparedStatementExecuteUpdateMethodAdvisor {
112112
@Advice.OnMethodEnter(suppress = classOf[Throwable])
113113
@static def executeStart(@Advice.This statement: HasStatementSQL): Option[Invocation] = {
114-
StatementMonitor.start(statement, statement.capturedStatementSQL(), StatementTypes.Update)
114+
StatementMonitor.start(statement, statement.capturedStatementSQL, StatementTypes.Update)
115115
}
116116

117117
@Advice.OnMethodExit(onThrowable = classOf[Throwable], suppress = classOf[Throwable])
@@ -130,7 +130,7 @@ object StatementExecuteBatchMethodAdvisor {
130130
@Advice.OnMethodEnter(suppress = classOf[Throwable])
131131
@static def executeStart(@Advice.This statement: Any): Option[Invocation] = {
132132
val statementSQL = statement match {
133-
case hSQL: HasStatementSQL => hSQL.capturedStatementSQL()
133+
case hSQL: HasStatementSQL => hSQL.capturedStatementSQL
134134
case _ => statement.toString
135135
}
136136

instrumentation/kamon-jdbc/src/test/scala/kamon/instrumentation/jdbc/HikariInstrumentationSpec.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.scalatest.matchers.should.Matchers
2828
import org.scalatest.time.SpanSugar
2929
import org.scalatest.wordspec.AnyWordSpec
3030
import org.scalatest.{BeforeAndAfterEach, OptionValues}
31+
import org.testcontainers.containers.PostgreSQLContainer
3132

3233
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
3334

@@ -42,7 +43,7 @@ class HikariInstrumentationSpec extends AnyWordSpec
4243
with InitAndStopKamonAfterAll
4344
with OptionValues {
4445

45-
import HikariInstrumentationSpec.{createH2Pool, createSQLitePool}
46+
import HikariInstrumentationSpec.{createH2Pool, createSQLitePool, createAwsWrapperPool}
4647
implicit val parallelQueriesContext: ExecutionContextExecutor =
4748
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))
4849

@@ -55,6 +56,7 @@ class HikariInstrumentationSpec extends AnyWordSpec
5556
)
5657

5758
"the Hikari instrumentation" should {
59+
5860
"track each hikari pool using the pool name as tag and cleanup after closing the pool" in {
5961
val pool1 = createH2Pool("example-1")
6062
val pool2 = createH2Pool("example-2")
@@ -209,6 +211,20 @@ class HikariInstrumentationSpec extends AnyWordSpec
209211
connection.close()
210212
pool.close()
211213
}
214+
215+
"construct pools with the AWS wrapper" in {
216+
val postgresqlContainer = new PostgreSQLContainer("postgres:16")
217+
postgresqlContainer.start()
218+
val url = postgresqlContainer
219+
.getJdbcUrl()
220+
.replace("postgresql", "aws-wrapper:postgresql") + "&user=test&password=test"
221+
222+
val pool = createAwsWrapperPool("aws-wrapper", url)
223+
val connection = pool.getConnection()
224+
connection.close()
225+
pool.close()
226+
postgresqlContainer.stop()
227+
}
212228
}
213229
}
214230

@@ -244,6 +260,16 @@ object HikariInstrumentationSpec {
244260
hikariPool
245261
}
246262

263+
def createAwsWrapperPool(name: String, url: String, size: Int = 1): HikariDataSource = {
264+
System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "200")
265+
266+
val config = basicConfig(name, size)
267+
config.setJdbcUrl(url)
268+
269+
val hikariPool = new HikariDataSource(config)
270+
hikariPool
271+
}
272+
247273
private def basicConfig(name: String, size: Int): HikariConfig = {
248274
val config = new HikariConfig()
249275
config.setConnectionInitSql("SELECT 1;")

instrumentation/kamon-jdbc/src/test/scala/kamon/instrumentation/jdbc/StatementInstrumentationSpec.scala

Lines changed: 73 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
package kamon.instrumentation.jdbc
1717

18-
import ch.vorburger.mariadb4j.DB
1918
import com.typesafe.config.ConfigFactory
2019
import kamon.Kamon
2120
import kamon.tag.Lookups._
@@ -28,6 +27,8 @@ import org.scalatest.matchers.should.Matchers
2827
import org.scalatest.time.SpanSugar
2928
import org.scalatest.wordspec.AnyWordSpec
3029
import org.scalatest.{BeforeAndAfterEach, OptionValues}
30+
import org.testcontainers.containers.PostgreSQLContainer
31+
import org.testcontainers.containers.MySQLContainer
3132

3233
import java.sql.{Connection, DriverManager, ResultSet}
3334
import java.time.Duration
@@ -65,11 +66,13 @@ class StatementInstrumentationSpec extends AnyWordSpec
6566
override implicit def patienceConfig: PatienceConfig = PatienceConfig(timeout = scaled(2 seconds))
6667

6768
val drivers = Seq(
69+
DriverSuite.Postgres,
6870
DriverSuite.H2,
6971
DriverSuite.SQLite,
7072
DriverSuite.MySQL,
71-
DriverSuite.HikariH2
72-
).filter(canRunInCurrentEnvironment)
73+
DriverSuite.HikariH2,
74+
DriverSuite.PostgresWithAWSAdvancedDriver
75+
)
7376

7477
"the StatementInstrumentation" when {
7578
drivers.foreach { driver =>
@@ -265,7 +268,7 @@ class StatementInstrumentationSpec extends AnyWordSpec
265268
if (driver.supportSleeping) {
266269
val vendorTags = TagSet.of("db.vendor", driver.vendor)
267270

268-
for (id 1 to 10) yield {
271+
for (id <- 1 to 10) yield {
269272
Future {
270273
val connection = driver.connect()
271274
driver.sleep(connection, Duration.ofMillis(1500))
@@ -437,35 +440,90 @@ class StatementInstrumentationSpec extends AnyWordSpec
437440
object MySQL extends DriverSuite with AddressTableSetup {
438441
val name = "MySQL"
439442
val vendor = "mysql"
440-
val url = "jdbc:mysql://localhost/test"
441443
val supportSleeping = false
444+
lazy val mysqlContainer = new MySQLContainer("mysql:8.4.5")
445+
lazy val url = mysqlContainer.getJdbcUrl() + "?user=test&password=test"
442446
lazy val connection = DriverManager.getConnection(url)
443447

444448
override def init(): Unit = {
445-
DB.newEmbeddedDB(3306).start()
449+
mysqlContainer.start()
446450
initializeAddressTable(connect())
447451
}
448452

449453
override def connect(): Connection = connection
450454

451455
override def sleep(connection: Connection, duration: Duration): Unit = {}
452456

453-
override def cleanup() = connection.close()
457+
override def cleanup() = {
458+
connection.close()
459+
mysqlContainer.stop()
460+
}
461+
}
462+
463+
object Postgres extends DriverSuite with AddressTableSetup {
464+
val name = "Postgres"
465+
val vendor = "postgresql"
466+
val supportSleeping = false
467+
lazy val postgresqlContainer = new PostgreSQLContainer("postgres:16")
468+
lazy val url = postgresqlContainer.getJdbcUrl() + "&user=test&password=test"
469+
lazy val connection = DriverManager.getConnection(url)
470+
471+
override def init(): Unit = {
472+
postgresqlContainer.start()
473+
initializeAddressTable(connect())
474+
}
475+
476+
override def connect(): Connection = connection
477+
478+
override def sleep(connection: Connection, duration: Duration): Unit = {}
479+
480+
override def cleanup() = {
481+
connection.close()
482+
postgresqlContainer.stop()
483+
}
484+
}
485+
486+
object PostgresWithAWSAdvancedDriver extends DriverSuite with AddressTableSetup {
487+
val name = "Postgres with AWS Advanced Driver"
488+
val vendor = "postgresql"
489+
val supportSleeping = false
490+
491+
// It seems like the AWS wrapper will remove all parameters we pass on the
492+
// connection string and turn them into properties before passing them to
493+
// the underlying driver, so we need to cleanup the URL for all assertions
494+
// to work as epected.
495+
val connectionProperties = new java.util.Properties()
496+
connectionProperties.setProperty("user", "test")
497+
connectionProperties.setProperty("password", "test")
498+
lazy val postgresqlContainer = new PostgreSQLContainer("postgres:16")
499+
lazy val url = postgresqlContainer.getJdbcUrl().replace("?loggerLevel=OFF", "")
500+
lazy val urlForConnection = url.replace("postgresql", "aws-wrapper:postgresql")
501+
lazy val connection = DriverManager.getConnection(urlForConnection, connectionProperties)
502+
503+
override def init(): Unit = {
504+
postgresqlContainer.start()
505+
initializeAddressTable(connect())
506+
}
507+
508+
override def connect(): Connection = connection
509+
510+
override def sleep(connection: Connection, duration: Duration): Unit = {}
511+
512+
override def cleanup() = {
513+
connection.close()
514+
postgresqlContainer.stop()
515+
}
454516
}
455517

456518
trait AddressTableSetup {
457519
def initializeAddressTable(connection: Connection): Unit = {
458520
connection.createStatement().executeUpdate("DROP TABLE IF EXISTS Address;")
459521
connection.createStatement().executeUpdate("CREATE TABLE Address (Nr INTEGER, Name VARCHAR(128));")
460-
connection.createStatement().executeUpdate(s"INSERT INTO Address (Nr, Name) VALUES(1, 'foo')")
461-
connection.createStatement().executeUpdate(s"INSERT INTO Address (Nr, Name) VALUES(2, 'foo')")
462-
connection.createStatement().executeUpdate(s"INSERT INTO Address (Nr, Name) VALUES(3, 'foo')")
463-
connection.createStatement().executeUpdate(s"INSERT INTO Address (Nr, Name) VALUES(4, 'foo')")
522+
connection.createStatement().executeUpdate("INSERT INTO Address (Nr, Name) VALUES(1, 'foo')")
523+
connection.createStatement().executeUpdate("INSERT INTO Address (Nr, Name) VALUES(2, 'foo')")
524+
connection.createStatement().executeUpdate("INSERT INTO Address (Nr, Name) VALUES(3, 'foo')")
525+
connection.createStatement().executeUpdate("INSERT INTO Address (Nr, Name) VALUES(4, 'foo')")
464526
}
465527
}
466528
}
467-
468-
def canRunInCurrentEnvironment(driverSuite: DriverSuite): Boolean = {
469-
!(System.getenv("TRAVIS") != null && driverSuite.vendor == "mysql")
470-
}
471529
}

project/Build.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ object BaseProject extends AutoPlugin {
3535
/** Marker configuration for dependencies that will be shaded into their module's jar. */
3636
lazy val Shaded = config("shaded").hide
3737

38-
val kanelaAgent = "io.kamon" % "kanela-agent" % "2.0.0-beta.1"
38+
val kanelaAgent = "io.kamon" % "kanela-agent" % "2.0.0-beta.3"
3939
val slf4jApi = "org.slf4j" % "slf4j-api" % "2.0.17"
4040
val slf4jnop = "org.slf4j" % "slf4j-nop" % "2.0.17"
4141
val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.3.15"

0 commit comments

Comments
 (0)