Skip to content

Commit 8c36c01

Browse files
committed
feat: add file stream utility
1 parent 4a1093c commit 8c36c01

File tree

3 files changed

+237
-0
lines changed

3 files changed

+237
-0
lines changed

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ zstd = { version = "0.13.3", optional = true }
5858
[features]
5959
client = ["dep:tokio-rustls", "dep:rustls", "dep:webpki-roots"]
6060
default = []
61+
file-stream = []
6162
http2 = []
6263
jemalloc = ["dep:tikv-jemallocator"]
6364
multipart = ["dep:multer", "dep:uuid"]
@@ -77,6 +78,11 @@ doctest = false
7778
name = "auth"
7879
path = "examples/auth/src/main.rs"
7980

81+
[[example]]
82+
name = "file-stream"
83+
path = "examples/file-stream/src/main.rs"
84+
required-features = ["file-stream"]
85+
8086
[[example]]
8187
name = "hello-world"
8288
path = "examples/hello-world/src/main.rs"

src/file_stream.rs

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
//! File streaming utilities for efficient HTTP file delivery.
2+
//!
3+
//! This module provides `FileStream` for streaming files over HTTP with support for
4+
//! range requests, content-length headers, and proper MIME type detection. It enables
5+
//! efficient delivery of large files without loading them entirely into memory, making
6+
//! it suitable for serving media files, downloads, and other binary content.
7+
//!
8+
//! # Examples
9+
//!
10+
//! ```rust
11+
//! use tako::file_stream::FileStream;
12+
//! use tako::responder::Responder;
13+
//! use tokio_util::io::ReaderStream;
14+
//! use tokio::fs::File;
15+
//!
16+
//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
17+
//! // Stream a file from disk
18+
//! let file_stream = FileStream::from_path("./assets/video.mp4").await?;
19+
//! let response = file_stream.into_response();
20+
//!
21+
//! // Create a custom stream with metadata
22+
//! let file = File::open("./data.bin").await?;
23+
//! let reader_stream = ReaderStream::new(file);
24+
//! let custom_stream = FileStream::new(
25+
//! reader_stream,
26+
//! Some("download.bin".to_string()),
27+
//! Some(1024),
28+
//! );
29+
//! let response = custom_stream.into_response();
30+
//! # Ok(())
31+
//! # }
32+
//! ```
33+
34+
use std::{io::SeekFrom, path::Path};
35+
36+
use anyhow::Result;
37+
use bytes::Bytes;
38+
use futures_util::{TryStream, TryStreamExt};
39+
use hyper::{StatusCode, body::Frame};
40+
use tokio::{
41+
fs::File,
42+
io::{AsyncReadExt, AsyncSeekExt},
43+
};
44+
use tokio_util::io::ReaderStream;
45+
46+
use crate::{
47+
body::TakoBody,
48+
responder::Responder,
49+
types::{BoxError, Response},
50+
};
51+
52+
/// HTTP file stream with metadata support for efficient file delivery.
53+
///
54+
/// `FileStream` wraps any stream that produces bytes and associates it with optional
55+
/// metadata like filename and content size. This enables proper HTTP headers to be
56+
/// set for file downloads, including Content-Disposition for filename suggestions
57+
/// and Content-Length for known file sizes. The implementation supports both
58+
/// regular responses and HTTP range requests for partial content delivery.
59+
///
60+
/// # Examples
61+
///
62+
/// ```rust
63+
/// use tako::file_stream::FileStream;
64+
/// use tokio_util::io::ReaderStream;
65+
/// use tokio::fs::File;
66+
///
67+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
68+
/// // From file path (recommended)
69+
/// let stream = FileStream::from_path("./video.mp4").await?;
70+
///
71+
/// // From custom stream
72+
/// let file = File::open("./data.txt").await?;
73+
/// let reader = ReaderStream::new(file);
74+
/// let stream = FileStream::new(reader, Some("data.txt".to_string()), Some(2048));
75+
/// # Ok(())
76+
/// # }
77+
/// ```
78+
pub struct FileStream<S> {
79+
/// The underlying byte stream
80+
pub stream: S,
81+
/// Optional filename for Content-Disposition header
82+
pub file_name: Option<String>,
83+
/// Optional content size for Content-Length header
84+
pub content_size: Option<u64>,
85+
}
86+
87+
impl<S> FileStream<S>
88+
where
89+
S: TryStream + Send + 'static,
90+
S::Ok: Into<Bytes>,
91+
S::Error: Into<BoxError>,
92+
{
93+
/// Creates a new file stream with the provided metadata.
94+
pub fn new(stream: S, file_name: Option<String>, content_size: Option<u64>) -> Self {
95+
Self {
96+
stream,
97+
file_name,
98+
content_size,
99+
}
100+
}
101+
102+
/// Creates a file stream from a file system path with automatic metadata detection.
103+
pub async fn from_path<P>(path: P) -> Result<FileStream<ReaderStream<File>>>
104+
where
105+
P: AsRef<Path>,
106+
{
107+
let file = File::open(&path).await?;
108+
let mut content_size = None;
109+
let mut file_name = None;
110+
111+
if let Ok(metadata) = file.metadata().await {
112+
content_size = Some(metadata.len());
113+
}
114+
115+
if let Some(os_name) = path.as_ref().file_name()
116+
&& let Some(name) = os_name.to_str()
117+
{
118+
file_name = Some(name.to_owned());
119+
}
120+
121+
Ok(FileStream {
122+
stream: ReaderStream::new(file),
123+
file_name,
124+
content_size,
125+
})
126+
}
127+
128+
/// Creates an HTTP 206 Partial Content response for range requests.
129+
pub fn into_range_response(self, start: u64, end: u64, total_size: u64) -> Response {
130+
let mut response = hyper::Response::builder()
131+
.status(hyper::StatusCode::PARTIAL_CONTENT)
132+
.header(
133+
hyper::header::CONTENT_TYPE,
134+
mime::APPLICATION_OCTET_STREAM.as_ref(),
135+
)
136+
.header(
137+
hyper::header::CONTENT_RANGE,
138+
format!("bytes {}-{}/{}", start, end, total_size),
139+
)
140+
.header(hyper::header::CONTENT_LENGTH, (end - start + 1).to_string());
141+
142+
if let Some(ref name) = self.file_name {
143+
response = response.header(
144+
hyper::header::CONTENT_DISPOSITION,
145+
format!("attachment; filename=\"{}\"", name),
146+
);
147+
}
148+
149+
let body = TakoBody::from_try_stream(
150+
self.stream
151+
.map_ok(|chunk| Frame::data(Into::<Bytes>::into(chunk)))
152+
.map_err(Into::into),
153+
);
154+
155+
response.body(body).unwrap_or_else(|e| {
156+
(
157+
hyper::StatusCode::INTERNAL_SERVER_ERROR,
158+
format!("FileStream range error: {}", e),
159+
)
160+
.into_response()
161+
})
162+
}
163+
164+
/// Try to create a range response for a file stream.
165+
pub async fn try_range_response<P>(path: P, start: u64, mut end: u64) -> Result<Response>
166+
where
167+
P: AsRef<Path>,
168+
{
169+
let mut file = File::open(path).await?;
170+
let meta = file.metadata().await?;
171+
let total_size = meta.len();
172+
173+
if end == 0 {
174+
end = total_size - 1;
175+
}
176+
177+
if start > total_size || start > end || end >= total_size {
178+
return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable").into_response());
179+
}
180+
181+
file.seek(SeekFrom::Start(start)).await?;
182+
let stream = ReaderStream::new(file.take(end - start + 1));
183+
Ok(FileStream::new(stream, None, None).into_range_response(start, end, total_size))
184+
}
185+
}
186+
187+
impl<S> Responder for FileStream<S>
188+
where
189+
S: TryStream + Send + 'static,
190+
S::Ok: Into<Bytes>,
191+
S::Error: Into<BoxError>,
192+
{
193+
/// Converts the file stream into an HTTP response with appropriate headers.
194+
fn into_response(self) -> Response {
195+
let mut response = hyper::Response::builder()
196+
.status(hyper::StatusCode::OK)
197+
.header(
198+
hyper::header::CONTENT_TYPE,
199+
mime::APPLICATION_OCTET_STREAM.as_ref(),
200+
);
201+
202+
if let Some(size) = self.content_size {
203+
response = response.header(hyper::header::CONTENT_LENGTH, size.to_string());
204+
}
205+
206+
if let Some(ref name) = self.file_name {
207+
response = response.header(
208+
hyper::header::CONTENT_DISPOSITION,
209+
format!("attachment; filename=\"{}\"", name),
210+
);
211+
}
212+
213+
let body = TakoBody::from_try_stream(
214+
self.stream
215+
.map_ok(|chunk| Frame::data(Into::<Bytes>::into(chunk)))
216+
.map_err(Into::into),
217+
);
218+
219+
response.body(body).unwrap_or_else(|e| {
220+
(
221+
hyper::StatusCode::INTERNAL_SERVER_ERROR,
222+
format!("FileStream error: {}", e),
223+
)
224+
.into_response()
225+
})
226+
}
227+
}

src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ pub mod client;
3131
/// Request data extraction utilities for parsing query params, JSON, and more.
3232
pub mod extractors;
3333

34+
/// File streaming utilities for serving files.
35+
#[cfg(feature = "file-stream")]
36+
pub mod file_stream;
37+
3438
/// Request handler traits and implementations.
3539
mod handler;
3640

0 commit comments

Comments
 (0)