Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 68 additions & 77 deletions zio-http/js/src/main/scala/zio/http/internal/FetchDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,31 @@ final case class FetchDriver() extends ZClient.Driver[Any, Scope, Throwable] {
requestBody: Body,
sslConfig: Option[ClientSSLConfig],
proxy: Option[Proxy],
)(implicit trace: Trace): ZIO[Scope, Throwable, Response] = {
)(implicit trace: Trace): ZIO[Scope, Throwable, Response] =
for {
jsBody <- FetchDriver.fromZBody(requestBody)
response <-
ZIO.fromFuture { implicit ec =>
val jsMethod = FetchDriver.fromZMethod(requestMethod)
val jsHeaders = js.Dictionary(requestHeaders.map(h => h.headerName -> h.renderedValue).toSeq: _*)
for {
response <- dom
.fetch(
url.encode,
new dom.RequestInit {
method = jsMethod
headers = jsHeaders
body = jsBody
},
)
.toFuture
} yield {
val respHeaders = Headers.fromIterable(response.headers.map(h => Header.Custom(h(0), h(1))))
val ct = respHeaders.get(Header.ContentType)
Response(
status = Status.fromInt(response.status),
headers = respHeaders,
body = FetchBodyInternal.fromResponse(response, ct.map(Body.ContentType.fromHeader)),
)
}

jsBody <- FetchDriver.fromZBody(requestBody)
jsMethod = FetchDriver.fromZMethod(requestMethod)
jsHeaders = js.Dictionary(requestHeaders.map(h => h.headerName -> h.renderedValue).toSeq: _*)
abortSignal <- FetchDriver.makeAbortSignal
response <-
ZIO.fromPromiseJS {
dom.fetch(
url.encode,
new dom.RequestInit {
method = jsMethod
headers = jsHeaders
body = jsBody
signal = abortSignal
},
)
}
} yield response
}
respHeaders = Headers.fromIterable(response.headers.map(h => Header.Custom(h(0), h(1))))
ct = respHeaders.get(Header.ContentType)
} yield Response(
status = Status.fromInt(response.status),
headers = respHeaders,
body = FetchBodyInternal.fromResponse(response, ct.map(Body.ContentType.fromHeader)),
)

override def disableStreaming(implicit ev1: Scope =:= Scope): ZClient.Driver[Any, Any, Throwable] =
FetchDriverBatched()
Expand All @@ -59,7 +53,7 @@ final case class FetchDriver() extends ZClient.Driver[Any, Scope, Throwable] {
trace: Trace,
ev: Scope =:= Scope,
): ZIO[Env1 & Scope, Throwable, Response] =
throw new UnsupportedOperationException("WebSockets are not supported in the js client yet.")
ZIO.die(new UnsupportedOperationException("WebSockets are not supported in the js client yet."))

}

Expand Down Expand Up @@ -89,6 +83,13 @@ object FetchDriver {
body.asArray.map { ar => Uint8Array.of(ArraySeq.unsafeWrapArray(ar.map(_.toShort)): _*) }
}

// Without this, if you have a streaming request, and disconnect from the stream, the connection will leak and stay open indefinitely.
private[http] def makeAbortSignal: URIO[Scope, dom.AbortSignal] =
for {
controller <- ZIO.succeed { new dom.AbortController() }
_ <- ZIO.addFinalizer { ZIO.succeed { controller.abort() } }
} yield controller.signal

}

private[http] final case class FetchDriverBatched() extends ZClient.Driver[Any, Any, Throwable] {
Expand All @@ -101,54 +102,44 @@ private[http] final case class FetchDriverBatched() extends ZClient.Driver[Any,
requestBody: Body,
sslConfig: Option[ClientSSLConfig],
proxy: Option[Proxy],
)(implicit trace: Trace): ZIO[Any, Throwable, Response] = {
for {
jsBody <- FetchDriver.fromZBody(requestBody)
response <-
ZIO.fromFuture { implicit ec =>
val jsMethod = FetchDriver.fromZMethod(requestMethod)
val jsHeaders = js.Dictionary(requestHeaders.map(h => h.headerName -> h.renderedValue).toSeq: _*)
for {
response <- dom
.fetch(
url.encode,
new dom.RequestInit {
method = jsMethod
headers = jsHeaders
body = jsBody
},
)
.toFuture
// fully materialize body; convert ArrayBuffer to Array[Byte] manually.
// Fallback to text() on clone if needed.
bytes <- response
.arrayBuffer()
.toFuture
.map { buf =>
val view = new scala.scalajs.js.typedarray.Uint8Array(buf)
val out = new Array[Byte](view.length)
var i = 0
while (i < view.length) { out(i) = view(i).toByte; i += 1 }
out
}
.recoverWith { case _ =>
response.clone().text().toFuture.map(_.getBytes(Charsets.Http))
}
} yield {
val respHeaders = Headers.fromIterable(response.headers.map(h => Header.Custom(h(0), h(1))))
val ct = respHeaders.get(Header.ContentType)
val clHeader = respHeaders.get(Header.ContentLength)
val cl = clHeader.orElse(Some(Header.ContentLength(bytes.length.toLong)))
Response(
status = Status.fromInt(response.status),
headers = respHeaders,
body = FetchBodyBatched(bytes, ct.map(Body.ContentType.fromHeader), cl),
)
}

)(implicit trace: Trace): ZIO[Any, Throwable, Response] =
ZIO.scoped {
for {
jsBody <- FetchDriver.fromZBody(requestBody)
jsMethod = FetchDriver.fromZMethod(requestMethod)
jsHeaders = js.Dictionary(requestHeaders.map(h => h.headerName -> h.renderedValue).toSeq: _*)
abortSignal <- FetchDriver.makeAbortSignal
response <- ZIO.fromPromiseJS {
dom.fetch(
url.encode,
new dom.RequestInit {
method = jsMethod
headers = jsHeaders
body = jsBody
signal = abortSignal
},
)
}
} yield response
}
bytes <- ZIO.fromPromiseJS { response.arrayBuffer() }.map { buf =>
val view = new scala.scalajs.js.typedarray.Uint8Array(buf)
val out = new Array[Byte](view.length)
var i = 0
while (i < view.length) { out(i) = view(i).toByte; i += 1 }
out
}.catchAllCause { _ =>
ZIO.logDebug("Error fetching body bytes, using text fetch instead") *>
ZIO.fromPromiseJS { response.clone().text() }.map(_.getBytes(Charsets.Http))
}
respHeaders = Headers.fromIterable(response.headers.map(h => Header.Custom(h(0), h(1))))
ct = respHeaders.get(Header.ContentType)
clHeader = respHeaders.get(Header.ContentLength)
cl = clHeader.orElse(Some(Header.ContentLength(bytes.length.toLong)))
} yield Response(
status = Status.fromInt(response.status),
headers = respHeaders,
body = FetchBodyBatched(bytes, ct.map(Body.ContentType.fromHeader), cl),
)
}

override def socket[Env1 <: Any](version: Version, url: URL, headers: Headers, app: WebSocketApp[Env1])(implicit
trace: Trace,
Expand Down
Loading