This repository was archived by the owner on Oct 7, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
This repository was archived by the owner on Oct 7, 2024. It is now read-only.
Question: how would i use DataReader.read_bytes()? #41
Copy link
Copy link
Closed
Description
Say have a DataReader set up from a buffer. And i want to read all bytes at once. so i feed the data reader an empty winsdk.system.array("B"). I know the buffer isnt empty. However it appears that read_bytes does not write to the array. Am I doing something wrong?
import asyncio
import PIL.Image as Image
import io
from winsdk.windows.media.control import GlobalSystemMediaTransportControlsSessionManager
from winsdk.windows.storage.streams import Buffer, InputStreamOptions, DataReader
import winsdk
from sys import argv, exit, stderr
THUMBNAIL_BUFFER_SIZE = 5 * 1024 * 1024
async def get_stream():
print("getting sessions")
sessions = await GlobalSystemMediaTransportControlsSessionManager.request_async()
print("getting playback")
playback_dict = await get_media_info(sessions.get_current_session())
try:
print("getting thumbnail ref")
r_a_Stream_ref: winsdk.windows.storage.streams.IRandomAccessStreamReference
r_a_Stream_ref = playback_dict["thumbnail"]
except KeyError:
print("None")
return 1
print("attempting to retrieve stream")
Async_operation = r_a_Stream_ref.open_read_async()
i = 0
while Async_operation.status == 0 and i <500:
if (i % 20) == 0:
print(i)
# I Wouldve used Async_operation.completed != True, however this throws a NotImplementedException
i+=1
#print(Async_operation.status) #this prints 0 indefinetly
if i == 500:
print("timeout")
return 1
print("retrieved stream")
buffer = Buffer(THUMBNAIL_BUFFER_SIZE)
readable_stream = Async_operation.get_results()
print("trying to read stream..")
await readable_stream.read_async(buffer, buffer.capacity, InputStreamOptions.READ_AHEAD)
#print(buffer)
buffer_reader = DataReader.from_buffer(buffer)
arr = winsdk.system.Array("B")
buffer_reader.read_bytes(arr)
print(arr.__len__())
byte_arr = bytearray(arr)
return byte_arr
async def main():
#print(byte_arr)
img = Image.open(io.BytesIO(await get_stream()))
img.save(".\\thumbnail.png", "PNG")
img.close()
return 0
def props_to_dict(props):
return {
attr: props.__getattribute__(attr) for attr in dir(props) if attr[0] != '_'
}
async def get_media_info(current_session):
if current_session:
media_props = await current_session.try_get_media_properties_async()
return {
song_attr: media_props.__getattribute__(song_attr)
for song_attr in dir(media_props)
if song_attr[0] != '_'
}
if __name__ == "__main__":
try:
result = asyncio.run(main())
except Exception as e:
if "debug" in argv:
print(e)
else:
print("err")
exit(1)
if result == 0:
print("ok")
else: exit(result)Metadata
Metadata
Assignees
Labels
No labels