-
Notifications
You must be signed in to change notification settings - Fork 55
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
andrew (from workstation)
committed
Nov 5, 2020
1 parent
7f3243c
commit b3b98bf
Showing
6 changed files
with
315 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,3 +25,4 @@ urllib3==1.25.11 | |
voluptuous==0.12.0 | ||
yarl==1.6.2 | ||
zeroconf==0.28.6 | ||
aiounittest==1.4.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
import aiounittest | ||
|
||
from pyrogram.raw.types import Message as TlMessage, MessageMediaUnsupported, MessageMediaDocument, DocumentEmpty, \ | ||
Document | ||
from requests import Response | ||
|
||
from ..http import Http | ||
|
||
|
||
class StubException(Exception): | ||
pass | ||
|
||
|
||
class Stub: | ||
def __call__(self, *args, **kwargs): | ||
raise StubException() | ||
|
||
def __getattr__(self, item): | ||
raise StubException() | ||
|
||
|
||
def _call_stack_stub_helper(callers, f): | ||
caller = callers.pop(0) | ||
|
||
if caller[0] != f: | ||
raise StubException() | ||
|
||
def c(*args, **kwargs): | ||
if args != caller[1]: | ||
raise StubException() | ||
|
||
if kwargs != caller[2]: | ||
raise StubException() | ||
|
||
if isinstance(caller[3], Exception): | ||
raise caller[3] | ||
|
||
return caller[3] | ||
|
||
if caller[4]: | ||
async def async_c(*args, **kwargs): | ||
return c(*args, **kwargs) | ||
|
||
return async_c | ||
|
||
return c | ||
|
||
|
||
class CallStackStub: | ||
callers = None | ||
|
||
def __init__(self, callers): | ||
self.callers = callers | ||
|
||
def __getattr__(self, item): | ||
return _call_stack_stub_helper(self.callers, item) | ||
|
||
|
||
class KVStub: | ||
def __init__(self, **kv): | ||
for k, v in kv.items(): | ||
setattr(self, k, v) | ||
|
||
|
||
class AioHttpStubRequest: | ||
match_info: dict | ||
headers: dict | ||
|
||
def __init__(self): | ||
self.match_info = dict() | ||
self.headers = dict() | ||
|
||
|
||
# noinspection PyTypeChecker | ||
class TestHttp(aiounittest.AsyncTestCase): | ||
async def test_http_wrong_token(self): | ||
http = Http(Stub(), Stub()) | ||
|
||
request = AioHttpStubRequest() | ||
request.match_info["token"] = "1" | ||
request.match_info["message_id"] = "0" | ||
|
||
response = await http._stream_handler(request) | ||
|
||
self.assertEqual(response.status, 403) | ||
|
||
async def test_http_message_not_exists(self): | ||
mtproto = CallStackStub([("get_message", (10,), {}, ValueError(), True)]) | ||
config = KVStub(block_size=1024) | ||
|
||
http = Http(mtproto, config) | ||
http.add_remote_token(10, 1010) | ||
|
||
request = AioHttpStubRequest() | ||
request.match_info["message_id"] = "10" | ||
request.match_info["token"] = "1010" | ||
|
||
response = await http._stream_handler(request) | ||
|
||
self.assertEqual(response.status, 404) | ||
|
||
async def test_http_range_overflow_max_size(self): | ||
message = TlMessage(date=None, id=None, message=None, to_id=None) | ||
message.media = MessageMediaDocument() | ||
message.media.document = Document(id=None, access_hash=None, attributes=[], date=None, | ||
size=1023, dc_id=None, mime_type=None, file_reference=None) | ||
|
||
mtproto = CallStackStub([("get_message", (10,), {}, message, True)]) | ||
config = KVStub(block_size=1024) | ||
|
||
http = Http(mtproto, config) | ||
http.add_remote_token(10, 1010) | ||
|
||
request = AioHttpStubRequest() | ||
request.match_info["message_id"] = "10" | ||
request.match_info["token"] = "1010" | ||
request.headers["Range"] = "bytes=1090-10000/146515" | ||
|
||
response = await http._stream_handler(request) | ||
|
||
self.assertEqual(response.status, 400) | ||
|
||
async def test_http_message_wrong_document_type(self): | ||
message = TlMessage(date=None, id=None, message=None, to_id=None) | ||
message.media = MessageMediaDocument() | ||
message.media.document = DocumentEmpty(id=None) | ||
|
||
mtproto = CallStackStub([("get_message", (10,), {}, message, True)]) | ||
config = KVStub(block_size=1024) | ||
|
||
http = Http(mtproto, config) | ||
http.add_remote_token(10, 1010) | ||
|
||
request = AioHttpStubRequest() | ||
request.match_info["message_id"] = "10" | ||
request.match_info["token"] = "1010" | ||
|
||
response = await http._stream_handler(request) | ||
|
||
self.assertEqual(response.status, 404) | ||
|
||
async def test_http_messae_wrong_media_type(self): | ||
message = TlMessage(date=None, id=None, message=None, to_id=None) | ||
message.media = MessageMediaUnsupported() | ||
|
||
mtproto = CallStackStub([("get_message", (10,), {}, message, True)]) | ||
config = KVStub(block_size=1024) | ||
|
||
http = Http(mtproto, config) | ||
http.add_remote_token(10, 1010) | ||
|
||
request = AioHttpStubRequest() | ||
request.match_info["message_id"] = "10" | ||
request.match_info["token"] = "1010" | ||
|
||
response = await http._stream_handler(request) | ||
|
||
self.assertEqual(response.status, 404) | ||
|
||
async def test_http_filename_header(self): | ||
http = Http(Stub(), Stub()) | ||
|
||
response = Response() | ||
http._write_filename_header(response, "test") | ||
|
||
self.assertEqual(response.headers["Content-Disposition"], "inline; filename=\"test\"") | ||
|
||
async def test_http_filename_header_escape(self): | ||
http = Http(Stub(), Stub()) | ||
|
||
response = Response() | ||
http._write_filename_header(response, "t est\"") | ||
|
||
self.assertEqual(response.headers["Content-Disposition"], "inline; filename=\"t%20est%22\"") | ||
|
||
async def test_http_bad_massage_id(self): | ||
http = Http(Stub(), Stub()) | ||
http.add_remote_token(10, 1010) | ||
|
||
request = AioHttpStubRequest() | ||
request.match_info["message_id"] = "aaaa" | ||
request.match_info["token"] = "1010" | ||
|
||
response = await http._stream_handler(request) | ||
|
||
self.assertEqual(response.status, 401) | ||
|
||
async def test_invalid_aiohttp_parser(self): | ||
http = Http(Stub(), Stub()) | ||
request = AioHttpStubRequest() | ||
|
||
with self.assertRaises(KeyError): | ||
await http._stream_handler(request) | ||
|
||
async def test_http_bad_token(self): | ||
http = Http(Stub(), Stub()) | ||
http.add_remote_token(10, 1010) | ||
|
||
request = AioHttpStubRequest() | ||
request.match_info["message_id"] = "10" | ||
request.match_info["token"] = "aaaaa" | ||
|
||
response = await http._stream_handler(request) | ||
|
||
self.assertEqual(response.status, 401) | ||
|
||
async def test_http_right_token(self): | ||
http = Http(Stub(), Stub()) | ||
http.add_remote_token(10, 1010) | ||
|
||
request = AioHttpStubRequest() | ||
request.match_info["message_id"] = "10" | ||
request.match_info["token"] = "1010" | ||
|
||
with self.assertRaises(StubException): | ||
await http._stream_handler(request) | ||
|
||
async def test_http_bad_range(self): | ||
http = Http(Stub(), KVStub(block_size=1024)) | ||
http.add_remote_token(10, 1010) | ||
|
||
request = AioHttpStubRequest() | ||
request.match_info["message_id"] = "10" | ||
request.match_info["token"] = "1010" | ||
request.headers["Range"] = "aaa" | ||
|
||
response = await http._stream_handler(request) | ||
|
||
self.assertEqual(response.status, 400) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
import aiounittest | ||
|
||
from pyrogram.types import Message as BoxedMessage | ||
|
||
from ..tools import ascii_only, parse_http_range, build_uri, serialize_token, pyrogram_filename | ||
|
||
|
||
# noinspection PyTypeChecker | ||
class TestTools(aiounittest.AsyncTestCase): | ||
def test_ascii_only(self): | ||
self.assertEqual(ascii_only("abcAAA123"), "abcAAA123") | ||
self.assertEqual(ascii_only(chr(127)), chr(127)) | ||
self.assertEqual(ascii_only(chr(128)), "") | ||
self.assertEqual(ascii_only(chr(129)), "") | ||
self.assertEqual(ascii_only("aa" + chr(129)), "aa") | ||
|
||
def test_pyrogram_filename(self): | ||
class _NamedMedia: | ||
file_name: str = None | ||
|
||
def __init__(self, file_name: str): | ||
self.file_name = file_name | ||
|
||
document = BoxedMessage(document=_NamedMedia("doc1"), message_id=0) | ||
self.assertEqual(pyrogram_filename(document), "doc1") | ||
|
||
video = BoxedMessage(video=_NamedMedia("video1"), message_id=0) | ||
self.assertEqual(pyrogram_filename(video), "video1") | ||
|
||
audio = BoxedMessage(video=_NamedMedia("audio1"), message_id=0) | ||
self.assertEqual(pyrogram_filename(audio), "audio1") | ||
|
||
video_note = BoxedMessage(video_note=_NamedMedia("videonote1"), message_id=0) | ||
self.assertEqual(pyrogram_filename(video_note), "videonote1") | ||
|
||
animation = BoxedMessage(animation=_NamedMedia("animation1"), message_id=0) | ||
self.assertEqual(pyrogram_filename(animation), "animation1") | ||
|
||
def test_parse_http_range(self): | ||
self.assertEqual(parse_http_range("bytes=0-1023/146515", 1024), (0, 0, 1023)) | ||
self.assertEqual(parse_http_range("bytes=1000-1023/146515", 1024), (0, 1000, 1023)) | ||
self.assertEqual(parse_http_range("bytes=1090-1023/146515", 1024), (1024, 66, 1023)) | ||
self.assertEqual(parse_http_range("bytes=1090-aaa/146515", 1024), (1024, 66, None)) | ||
|
||
with self.assertRaises(ValueError): | ||
parse_http_range("", 1024) | ||
|
||
with self.assertRaises(ValueError): | ||
parse_http_range("bytes=aaaa-1023/146515", 1024) | ||
|
||
def test_serialize_token(self): | ||
self.assertEqual(serialize_token(0, 0), 0) | ||
self.assertEqual(serialize_token(1, 0), 1) | ||
self.assertEqual(serialize_token(1, 1), 18446744073709551617) | ||
self.assertEqual(serialize_token(1, 2), 36893488147419103233) | ||
self.assertEqual(serialize_token(2, 1), 18446744073709551618) | ||
self.assertEqual(serialize_token(2, 2), 36893488147419103234) | ||
|
||
def test_build_uri(self): | ||
class _Config: | ||
listen_port = 80 | ||
listen_host = "test" | ||
|
||
self.assertEqual(build_uri(_Config(), 0, 0), "http://test:80/stream/0/0") | ||
self.assertEqual(build_uri(_Config(), 1, 0), "http://test:80/stream/1/0") | ||
self.assertEqual(build_uri(_Config(), 0, 1), "http://test:80/stream/0/1") | ||
self.assertEqual(build_uri(_Config(), 1, 1), "http://test:80/stream/1/1") |