Skip to content

Commit

Permalink
wait all socket closed
Browse files Browse the repository at this point in the history
  • Loading branch information
andrew (from workstation) committed Nov 1, 2020
1 parent d8c3cf4 commit ca8dfdf
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 20 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

setuptools.setup(
name="smart_tv_telegram",
version="1.1.0.dev0",
version="1.1.1.dev0",
author="andrew-ld",
author_email="[email protected]",
description="A Telegram Bot to stream content on your smart TV",
Expand Down
4 changes: 2 additions & 2 deletions smart_tv_telegram/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from .bot import Bot


__version__ = "1.1.0"
__version_info__ = ("1", "1", "0")
__version__ = "1.1.1"
__version_info__ = ("1", "1", "1")
__author__ = "https://github.com/andrew-ld"


Expand Down
57 changes: 43 additions & 14 deletions smart_tv_telegram/http.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import typing
from urllib.parse import quote

Expand All @@ -23,6 +24,7 @@ class Http:
_tokens: typing.Set[int]
_downloaded_blocks: typing.Dict[int, typing.Set[int]]
_stream_debounce: typing.Dict[int, AsyncDebounce]
_stream_trasports: typing.Dict[int, typing.Set[asyncio.Transport]]

def __init__(self, mtproto: Mtproto, config: Config):
self._mtproto = mtproto
Expand All @@ -31,6 +33,7 @@ def __init__(self, mtproto: Mtproto, config: Config):
self._tokens = set()
self._downloaded_blocks = dict()
self._stream_debounce = dict()
self._stream_trasports = dict()

async def start(self):
app = web.Application()
Expand Down Expand Up @@ -96,21 +99,45 @@ def _feed_downloaded_blocks(self, block_id: int, local_token: int):
downloaded_blocks = self._downloaded_blocks.setdefault(local_token, set())
downloaded_blocks.add(block_id)

def _feed_stream_trasport(self, local_token: int, transport: asyncio.Transport):
transports = self._stream_trasports.setdefault(local_token, set())
transports.add(transport)

def _get_stream_transports(self, local_token: int) -> typing.Set[asyncio.Transport]:
return self._stream_trasports[local_token] if local_token in self._stream_trasports else set()

async def _timeout_handler(self, message_id: int, chat_id: int, local_token: int, size: int):
blocks = size // self._config.block_size
remain_blocks = blocks - len(self._downloaded_blocks[local_token])
remain_blocks_percentual = remain_blocks / blocks * 100

self._tokens.remove(local_token)
del self._downloaded_blocks[local_token]
_debounce = self._stream_debounce[local_token] # avoid garbage collector
del self._stream_debounce[local_token]

await self._mtproto.reply_message(
message_id,
chat_id,
f"download closed, {remain_blocks_percentual:0.2f}% remains"
)
if all(t.is_closing() for t in self._get_stream_transports(local_token)):
blocks = (size // self._config.block_size) + 1

if local_token in self._tokens:
self._tokens.remove(local_token)

remain_blocks = blocks

if local_token in self._downloaded_blocks:
remain_blocks = blocks - len(self._downloaded_blocks[local_token])
del self._downloaded_blocks[local_token]

_debounce = None

if local_token in self._stream_debounce:
_debounce = self._stream_debounce[local_token] # avoid garbage collector
del self._stream_debounce[local_token]

if local_token in self._stream_trasports:
del self._stream_trasports[local_token]

remain_blocks_percentual = remain_blocks / blocks * 100

await self._mtproto.reply_message(
message_id,
chat_id,
f"download closed, {remain_blocks_percentual:0.2f}% remains"
)

if local_token in self._stream_debounce:
self._stream_debounce[local_token].reschedule()

async def _stream_handler(self, request: Request) -> typing.Optional[Response]:
_message_id: str = request.match_info["message_id"]
Expand Down Expand Up @@ -200,6 +227,8 @@ async def _stream_handler(self, request: Request) -> typing.Optional[Response]:
if request.transport is None:
break

self._feed_stream_trasport(local_token, request.transport)

if request.transport.is_closing():
break

Expand Down
17 changes: 14 additions & 3 deletions smart_tv_telegram/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
async def _debounce_wrap(
function: typing.Callable[..., typing.Coroutine],
args: typing.Tuple[typing.Any],
timeout: int
timeout: int,
):
await asyncio.sleep(timeout)
await function(*args)
Expand All @@ -43,20 +43,31 @@ class AsyncDebounce:
_function: typing.Callable[..., typing.Coroutine]
_timeout: int
_task: typing.Optional[asyncio.Task] = None
_args: typing.Optional[typing.Tuple[typing.Any]] = None

def __init__(self, function: typing.Callable[..., typing.Coroutine], timeout: int):
self._function = function
self._timeout = timeout

def _run(self) -> bool:
if self._args is None:
return False

self._task = _LOOP.create_task(_debounce_wrap(self._function, self._args, self._timeout))
return True

def update_args(self, *args) -> bool:
if self._task is not None and self._task.done():
return False

if self._task is not None:
self._task.cancel()

self._task = _LOOP.create_task(_debounce_wrap(self._function, args, self._timeout))
return True
self._args = args
return self._run()

def reschedule(self):
return self._run()


def secret_token(nbytes: int = 8) -> int:
Expand Down

0 comments on commit ca8dfdf

Please sign in to comment.