Skip to content

Commit 100446f

Browse files
committed
add send_bytes method
1 parent 59e03c8 commit 100446f

File tree

3 files changed

+79
-2
lines changed

3 files changed

+79
-2
lines changed

livekit-protocol/protocol

Submodule protocol updated 98 files

livekit-rtc/livekit/rtc/participant.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import asyncio
1919
import os
2020
import mimetypes
21+
import io
2122
import aiofiles
2223
from typing import List, Union, Callable, Dict, Awaitable, Optional, Mapping, cast, TypeVar
2324
from abc import abstractmethod, ABC
@@ -664,6 +665,82 @@ async def send_file(
664665

665666
return writer.info
666667

668+
async def send_bytes(
669+
self,
670+
data: Union[bytes, bytearray, memoryview, io.IOBase],
671+
name: str,
672+
*,
673+
mime_type: str = "application/octet-stream",
674+
topic: str = "",
675+
destination_identities: Optional[List[str]] = None,
676+
attributes: Optional[Dict[str, str]] = None,
677+
stream_id: str | None = None,
678+
) -> ByteStreamInfo:
679+
"""
680+
Send in-memory bytes or a file-like object as a byte stream.
681+
682+
Accepts common Python byte/blob types: bytes, bytearray, memoryview, and readable io.IOBase
683+
(e.g., io.BytesIO, buffered readers). The name is used for the stream metadata.
684+
"""
685+
# Bytes-like input path
686+
if isinstance(data, (bytes, bytearray, memoryview)):
687+
buffer = bytes(data)
688+
total_size = len(buffer)
689+
690+
writer: ByteStreamWriter = await self.stream_bytes(
691+
name=name,
692+
total_size=total_size,
693+
mime_type=mime_type,
694+
attributes=attributes,
695+
stream_id=stream_id,
696+
destination_identities=destination_identities,
697+
topic=topic,
698+
)
699+
700+
offset = 0
701+
while offset < total_size:
702+
end = min(offset + STREAM_CHUNK_SIZE, total_size)
703+
await writer.write(buffer[offset:end])
704+
offset = end
705+
706+
await writer.aclose()
707+
return writer.info
708+
709+
# File-like input path
710+
if isinstance(data, io.IOBase) and data.readable():
711+
total_size: Optional[int] = None
712+
try:
713+
if data.seekable():
714+
current_pos = data.tell()
715+
data.seek(0, io.SEEK_END)
716+
end_pos = data.tell()
717+
total_size = end_pos - current_pos
718+
data.seek(current_pos, io.SEEK_SET)
719+
except Exception:
720+
total_size = None
721+
722+
writer = await self.stream_bytes(
723+
name=name,
724+
total_size=total_size,
725+
mime_type=mime_type,
726+
attributes=attributes,
727+
stream_id=stream_id,
728+
destination_identities=destination_identities,
729+
topic=topic,
730+
)
731+
732+
while True:
733+
chunk = data.read(STREAM_CHUNK_SIZE)
734+
if not chunk:
735+
break
736+
await writer.write(chunk)
737+
await writer.aclose()
738+
return writer.info
739+
740+
raise TypeError(
741+
"Unsupported data type for send_bytes. Expected bytes, bytearray, memoryview, or a readable io.IOBase."
742+
)
743+
667744
async def publish_track(
668745
self, track: LocalTrack, options: TrackPublishOptions = TrackPublishOptions()
669746
) -> LocalTrackPublication:

0 commit comments

Comments
 (0)