-
Say have a import asyncio
import PIL.Image as Image
import io
from winsdk.windows.media.control import GlobalSystemMediaTransportControlsSessionManager
from winsdk.windows.storage.streams import Buffer, InputStreamOptions, DataReader
import winsdk
from sys import argv, exit, stderr
THUMBNAIL_BUFFER_SIZE = 5 * 1024 * 1024
async def get_stream():
print("getting sessions")
sessions = await GlobalSystemMediaTransportControlsSessionManager.request_async()
print("getting playback")
playback_dict = await get_media_info(sessions.get_current_session())
try:
print("getting thumbnail ref")
r_a_Stream_ref: winsdk.windows.storage.streams.IRandomAccessStreamReference
r_a_Stream_ref = playback_dict["thumbnail"]
except KeyError:
print("None")
return 1
print("attempting to retrieve stream")
Async_operation = r_a_Stream_ref.open_read_async()
i = 0
while Async_operation.status == 0 and i <500:
if (i % 20) == 0:
print(i)
# I Wouldve used Async_operation.completed != True, however this throws a NotImplementedException
i+=1
#print(Async_operation.status) #this prints 0 indefinetly
if i == 500:
print("timeout")
return 1
print("retrieved stream")
buffer = Buffer(THUMBNAIL_BUFFER_SIZE)
readable_stream = Async_operation.get_results()
print("trying to read stream..")
await readable_stream.read_async(buffer, buffer.capacity, InputStreamOptions.READ_AHEAD)
#print(buffer)
buffer_reader = DataReader.from_buffer(buffer)
arr = winsdk.system.Array("B")
buffer_reader.read_bytes(arr)
print(arr.__len__())
byte_arr = bytearray(arr)
return byte_arr
async def main():
#print(byte_arr)
img = Image.open(io.BytesIO(await get_stream()))
img.save(".\\thumbnail.png", "PNG")
img.close()
return 0
def props_to_dict(props):
return {
attr: props.__getattribute__(attr) for attr in dir(props) if attr[0] != '_'
}
async def get_media_info(current_session):
if current_session:
media_props = await current_session.try_get_media_properties_async()
return {
song_attr: media_props.__getattribute__(song_attr)
for song_attr in dir(media_props)
if song_attr[0] != '_'
}
if __name__ == "__main__":
try:
result = asyncio.run(main())
except Exception as e:
if "debug" in argv:
print(e)
else:
print("err")
exit(1)
if result == 0:
print("ok")
else: exit(result) |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
I am asking because IMO calling read_byte() for every byte i want to read takes way too long |
Beta Was this translation helpful? Give feedback.
-
It is best to avoid |
Beta Was this translation helpful? Give feedback.
It is best to avoid
DataReader
and use Python APIs like thestruct
module instead.Buffer
objects can be passed directly to anything in Python that uses the Python buffer protocol. Soreturn bytearray(buffer)
would probably do what you want in this case.