|
24 | 24 | from ._proto import ffi_pb2 as proto_ffi
|
25 | 25 | from ._proto import participant_pb2 as proto_participant
|
26 | 26 | from ._proto import room_pb2 as proto_room
|
| 27 | +from ._proto import stats_pb2 as proto_stats |
27 | 28 | from ._proto.room_pb2 import ConnectionState
|
28 | 29 | from ._proto.track_pb2 import TrackKind
|
29 | 30 | from ._proto.rpc_pb2 import RpcMethodInvocationEvent
|
@@ -120,6 +121,12 @@ class SipDTMF:
|
120 | 121 | """Participant who sent the DTMF digit. None when sent by a server SDK."""
|
121 | 122 |
|
122 | 123 |
|
| 124 | +@dataclass |
| 125 | +class RtcStats: |
| 126 | + publisher_stats: list[proto_stats.RtcStats] |
| 127 | + subscriber_stats: list[proto_stats.RtcStats] |
| 128 | + |
| 129 | + |
123 | 130 | class ConnectError(Exception):
|
124 | 131 | def __init__(self, message: str):
|
125 | 132 | self.message = message
|
@@ -408,6 +415,30 @@ def on_participant_connected(participant):
|
408 | 415 | # start listening to room events
|
409 | 416 | self._task = self._loop.create_task(self._listen_task())
|
410 | 417 |
|
| 418 | + async def get_rtc_stats(self) -> RtcStats: |
| 419 | + if not self.isconnected(): |
| 420 | + raise RuntimeError("the room isn't connected") |
| 421 | + |
| 422 | + req = proto_ffi.FfiRequest() |
| 423 | + req.get_session_stats.room_handle = self._ffi_handle.handle # type: ignore |
| 424 | + |
| 425 | + queue = FfiClient.instance.queue.subscribe() |
| 426 | + try: |
| 427 | + resp = FfiClient.instance.request(req) |
| 428 | + cb: proto_ffi.FfiEvent = await queue.wait_for( |
| 429 | + lambda e: e.get_session_stats.async_id == resp.get_session_stats.async_id |
| 430 | + ) |
| 431 | + finally: |
| 432 | + FfiClient.instance.queue.unsubscribe(queue) |
| 433 | + |
| 434 | + if cb.get_session_stats.error: |
| 435 | + raise RuntimeError(cb.get_session_stats.error) |
| 436 | + |
| 437 | + publisher_stats = list(cb.get_session_stats.result.publisher_stats) |
| 438 | + subscriber_stats = list(cb.get_session_stats.result.subscriber_stats) |
| 439 | + |
| 440 | + return RtcStats(publisher_stats=publisher_stats, subscriber_stats=subscriber_stats) |
| 441 | + |
411 | 442 | def register_byte_stream_handler(self, topic: str, handler: ByteStreamHandler):
|
412 | 443 | existing_handler = self._byte_stream_handlers.get(topic)
|
413 | 444 | if existing_handler is None:
|
@@ -446,6 +477,7 @@ async def disconnect(self) -> None:
|
446 | 477 | await queue.wait_for(lambda e: e.disconnect.async_id == resp.disconnect.async_id)
|
447 | 478 | finally:
|
448 | 479 | FfiClient.instance.queue.unsubscribe(queue)
|
| 480 | + |
449 | 481 | await self._task
|
450 | 482 | FfiClient.instance.queue.unsubscribe(self._ffi_queue)
|
451 | 483 |
|
|
0 commit comments