Skip to content

Commit 59e03c8

Browse files
authored
AudioStream play example with noise cancellation (#467)
1 parent 2c64165 commit 59e03c8

File tree

1 file changed

+158
-0
lines changed

1 file changed

+158
-0
lines changed

examples/play_audio_stream.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import asyncio
2+
import os
3+
import numpy as np
4+
import sounddevice as sd
5+
6+
from livekit import rtc, api
7+
from livekit.plugins import noise_cancellation
8+
9+
SAMPLERATE = 48000
10+
BLOCKSIZE = 480 # 10ms chunks at 48kHz
11+
CHANNELS = 1
12+
13+
14+
class AudioBuffer:
15+
def __init__(self, blocksize=BLOCKSIZE):
16+
self.blocksize = blocksize
17+
self.buffer = np.array([], dtype=np.int16)
18+
19+
def add_frame(self, frame_data):
20+
self.buffer = np.concatenate([self.buffer, frame_data])
21+
22+
def get_chunk(self):
23+
if len(self.buffer) >= self.blocksize:
24+
chunk = self.buffer[: self.blocksize]
25+
self.buffer = self.buffer[self.blocksize :]
26+
return chunk
27+
return None
28+
29+
def get_padded_chunk(self):
30+
if len(self.buffer) > 0:
31+
chunk = np.zeros(self.blocksize, dtype=np.int16)
32+
available = min(len(self.buffer), self.blocksize)
33+
chunk[:available] = self.buffer[:available]
34+
self.buffer = self.buffer[available:]
35+
return chunk
36+
return np.zeros(self.blocksize, dtype=np.int16)
37+
38+
39+
async def audio_player(queue: asyncio.Queue):
40+
"""Pull from the queue and stream audio using sounddevice."""
41+
buffer = AudioBuffer(BLOCKSIZE)
42+
43+
def callback(outdata, frames, time, status):
44+
if status:
45+
print(f"Audio callback status: {status}")
46+
47+
# Try to fill buffer from queue
48+
while not queue.empty():
49+
try:
50+
data = queue.get_nowait()
51+
buffer.add_frame(data)
52+
except asyncio.QueueEmpty:
53+
break
54+
55+
# Get exactly the right amount of data
56+
chunk = buffer.get_chunk()
57+
if chunk is not None:
58+
outdata[:] = chunk.reshape(-1, 1)
59+
else:
60+
# Not enough data, use what we have padded with zeros
61+
outdata[:] = buffer.get_padded_chunk().reshape(-1, 1)
62+
63+
stream = sd.OutputStream(
64+
samplerate=SAMPLERATE,
65+
channels=CHANNELS,
66+
blocksize=BLOCKSIZE,
67+
dtype="int16",
68+
callback=callback,
69+
latency="low",
70+
)
71+
with stream:
72+
while True:
73+
await asyncio.sleep(0.1) # keep the loop alive
74+
75+
76+
async def rtc_session(room, queue: asyncio.Queue):
77+
track: rtc.RemoteAudioTrack | None = None
78+
while not track:
79+
for participant in room.remote_participants.values():
80+
for t in participant.track_publications.values():
81+
if t.kind == rtc.TrackKind.KIND_AUDIO and t.subscribed:
82+
track = t.track
83+
break
84+
if track:
85+
break
86+
if not track:
87+
print("waiting for audio track")
88+
await asyncio.sleep(2)
89+
90+
stream = rtc.AudioStream.from_track(
91+
track=track,
92+
sample_rate=SAMPLERATE,
93+
num_channels=1,
94+
noise_cancellation=noise_cancellation.BVC(), # or NC()
95+
)
96+
97+
print("playing stream")
98+
try:
99+
# Process audio frames from the stream
100+
async for audio_frame_event in stream:
101+
frame = audio_frame_event.frame
102+
103+
audio_data = np.frombuffer(frame.data, dtype=np.int16)
104+
105+
try:
106+
await queue.put(audio_data)
107+
except asyncio.QueueFull:
108+
# Skip this frame if queue is full
109+
print("Warning: Audio queue full, dropping frame")
110+
continue
111+
112+
finally:
113+
# Clean up the stream when done
114+
await stream.aclose()
115+
116+
117+
async def main():
118+
queue = asyncio.Queue(maxsize=50)
119+
player_task = asyncio.create_task(audio_player(queue))
120+
121+
token = (
122+
api.AccessToken()
123+
.with_identity("python-bot")
124+
.with_name("Python Bot")
125+
.with_grants(
126+
api.VideoGrants(
127+
room_join=True,
128+
room="my-room",
129+
agent=True,
130+
)
131+
)
132+
.to_jwt()
133+
)
134+
url = os.getenv("LIVEKIT_URL")
135+
136+
room = rtc.Room()
137+
await room.connect(
138+
url,
139+
token,
140+
options=rtc.RoomOptions(
141+
auto_subscribe=True,
142+
),
143+
)
144+
print(f"Connected to room: {room.name}")
145+
146+
try:
147+
await rtc_session(room, queue)
148+
finally:
149+
# Clean up
150+
await room.disconnect()
151+
player_task.cancel()
152+
try:
153+
await player_task
154+
except asyncio.CancelledError:
155+
pass
156+
157+
158+
asyncio.run(main())

0 commit comments

Comments
 (0)