Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a Python package & add a test suite #493

Open
wants to merge 8 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions smarthack/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Collection of scripts to flash Tuya IoT devices to alternative firmwares."""
1 change: 0 additions & 1 deletion scripts/tuya-discovery.py → smarthack/discovery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python3
# encoding: utf-8
"""
tuya-discovery.py
Created by kueblc on 2019-11-13.
Discover Tuya devices on the LAN via UDP broadcast
"""
Expand Down
1 change: 1 addition & 0 deletions scripts/mq_pub_15.py → smarthack/mqtt.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
# encoding: utf-8
# type: ignore
"""
mq_pub_15.py
Created by nano on 2018-11-22.
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python3
# encoding: utf-8
"""
fake-registration-server.py
Created by nano on 2018-11-22.
Copyright (c) 2018 VTRUST. All rights reserved.
"""
Expand Down Expand Up @@ -143,7 +142,7 @@ def post(self):
answer["mediaMqttsUrl"] = options.addr
answer["aispeech"] = options.addr
self.reply(answer)
os.system("pkill -f smartconfig/main.py")
os.system("pkill -f smarthack.smartconfig.main")

elif(".active" in a):
print("Answer s.gw.dev.pk.active")
Expand All @@ -163,7 +162,8 @@ def post(self):
self.reply(answer)
print("TRIGGER UPGRADE IN 10 SECONDS")
protocol = "2.2" if encrypted else "2.1"
os.system("sleep 10 && ./mq_pub_15.py -i %s -p %s &" % (gwId, protocol))
mqttbin = os.path.dirname(os.path.abspath(__file__)) + "/" + "mqtt.py"
os.system("sleep 10 && %s -i %s -p %s &" % (mqttbin, gwId, protocol))

# Upgrade endpoints
elif(".updatestatus" in a):
Expand Down
1 change: 1 addition & 0 deletions smarthack/smartconfig/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""An ESP8266 SmartConfig implementation."""
2 changes: 1 addition & 1 deletion scripts/smartconfig/broadcast.py → smarthack/smartconfig/broadcast.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
broadcast strategy ported from https://github.com/tuyapi/link
"""

from crc import crc_8
from .crc import crc_8

broadcast_head = [1, 3, 6, 10]

Expand Down
1 change: 1 addition & 0 deletions scripts/smartconfig/crc.py → smarthack/smartconfig/crc.py
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
# encoding: utf-8
# type: ignore
"""
crc.py
Created by kueblc on 2019-01-25.
Expand Down
2 changes: 1 addition & 1 deletion scripts/smartconfig/main.py → smarthack/smartconfig/main.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
token = "00000000"
secret = "0101"

from smartconfig import smartconfig
from .smartconfig import smartconfig
from time import sleep

print('Put device in EZ config mode (blinking fast)')
Expand Down
2 changes: 1 addition & 1 deletion scripts/smartconfig/multicast.py → smarthack/smartconfig/multicast.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
multicast strategy reverse engineered by kueblc
"""

from crc import crc_32
from .crc import crc_32

from Cryptodome.Cipher import AES
pad = lambda data, block_size : data + ('\0' * ( (block_size - len(data)) % block_size ) )
Expand Down
4 changes: 2 additions & 2 deletions scripts/smartconfig/smartconfig.py → smarthack/smartconfig/smartconfig.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def send_multicast( self, data ):
self._socket.sendto( b'\0', (ip, 30012))
sleep(self._gap)

from broadcast import broadcast_head, encode_broadcast_body
from multicast import multicast_head, encode_multicast_body
from .broadcast import broadcast_head, encode_broadcast_body
from .multicast import multicast_head, encode_multicast_body

def smartconfig( password, ssid, region, token, secret ):
sock = SmartConfigSocket()
Expand Down
12 changes: 6 additions & 6 deletions start_flash.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ setup () {
echo
sleep 5
echo " Starting web server in a screen"
$screen_with_log smarthack-web.log -S smarthack-web -m -d ./fake-registration-server.py
$screen_with_log smarthack-web.log -S smarthack-web -m -d ../smarthack/registration.py
echo " Starting Mosquitto in a screen"
$screen_with_log smarthack-mqtt.log -S smarthack-mqtt -m -d mosquitto -v
echo " Starting PSK frontend in a screen"
$screen_with_log smarthack-psk.log -S smarthack-psk -m -d ./psk-frontend.py -v
$screen_with_log smarthack-psk.log -S smarthack-psk -m -d ../smarthack/pskproxy.py -v
echo " Starting Tuya Discovery in a screen"
$screen_with_log smarthack-udp.log -S smarthack-udp -m -d ./tuya-discovery.py
$screen_with_log smarthack-udp.log -S smarthack-udp -m -d ../smarthack/discovery.py
echo
}

Expand Down Expand Up @@ -63,7 +63,7 @@ while true; do
echo "======================================================"

echo "Starting smart config pairing procedure"
./smartconfig/main.py &
PYTHONPATH=.. python3 -m smarthack.smartconfig.main

echo "Waiting for the device to install the intermediate firmware"

Expand All @@ -74,7 +74,7 @@ while true; do
echo
echo "Device did not appear with the intermediate firmware"
echo "Check the *.log files in the scripts folder"
pkill -f smartconfig/main.py && echo "Stopping smart config"
pkill -f smarthack.smartconfig.main && echo "Stopping smart config"
read -p "Do you want to try flashing another device? [y/N] " -n 1 -r
echo
[[ "$REPLY" =~ ^[Yy]$ ]] || break 2
Expand All @@ -85,7 +85,7 @@ while true; do
echo
echo "IoT-device is online with ip 10.42.42.42"

pkill -f smartconfig/main.py && echo "Stopping smart config"
pkill -f smarthack.smartconfig.main && echo "Stopping smart config"

echo "Fetching firmware backup"
sleep 2
Expand Down
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Test suite for smarthack."""
48 changes: 48 additions & 0 deletions tests/test_crc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/usr/bin/env python3
"""Test the CRC functions."""

# Copyright (c) 2020 Faidon Liambotis
# SPDX-License-Identifier: MIT

from typing import (
List,
Sequence,
Tuple,
Union,
)

from smarthack.smartconfig.crc import crc_32, crc_8 # type: ignore


PRECOMPUTED: Sequence[Tuple[Union[bytes, List[int]], int, str]] = [
# (<data>, <crc8>, <crc32>)
# first, in byte strings
(b"", 0, "00000000"),
(b"\x00", 0, "8def02d2"),
(b"0", 190, "21dfdbf4"),
(b"\x80", 140, "ad6cba3f"),
(b"foobar", 53, "951ff69e"),
(b"foobar" * 1024, 72, "e65db3fd"),
# the exact equivalent, in arrays of ints
([], 0, "00000000"),
([0], 0, "8def02d2"),
([48], 190, "21dfdbf4"),
([128], 140, "ad6cba3f"),
([102, 111, 111, 98, 97, 114], 53, "951ff69e"),
([102, 111, 111, 98, 97, 114] * 1024, 72, "e65db3fd"),
]


def test_crc8() -> None:
"""Test the CRC-8 method against a set of precomputed values."""
for data, checksum8, _ in PRECOMPUTED:
assert crc_8(data) == checksum8


def test_crc32() -> None:
"""Test the CRC-32 method against a set of precomputed values."""
for data, _, checksum32 in PRECOMPUTED:
# this is a weird way of packing to little-endian, but this is how it's
# currently being used in the callsites in the tree
crc = [(crc_32(data) >> i) & 255 for i in range(0, 32, 8)]
assert bytearray(crc).hex() == checksum32
65 changes: 65 additions & 0 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env python3
"""Test the smarthack.discovery module."""

# Copyright (c) 2020 Faidon Liambotis
# SPDX-License-Identifier: MIT

import json
from typing import Any

import pytest # type: ignore

from smarthack.discovery import TuyaDiscovery

DUMMY_DATA = {
"ip": "10.42.42.14",
"gwId": "220388844c11ae0dd558",
"active": 2,
"ability": 0,
"mode": 0,
"encrypt": True,
"productKey": "RN2FVAgXG6WfAktU",
"version": "3.3",
}
DUMMY_JSON = json.dumps(DUMMY_DATA)
ENCRYPTED_DUMMY_JSON = bytes.fromhex(
"415def5dd070e9aeb8e9e533e5375ffa4ba2680192d4"
+ "f6b381547fd0e3900b52148d7ecaa398b9298731deb8"
+ "e2ed33e612c45198dcd3c617a5271b12c588b66a98b5"
+ "a7135018ddfa3ca88c5b6c41188dd48182ba55c1c346"
+ "99c5942905fd52a0f24d14479cf7a3dd4cb43d0f842d"
+ "0ddc100f4624753c0f126f24c544414cae28f814bd80"
+ "77544a44fd89391ba4daed3d18febaf9be838a361895"
+ "d51d2451d6852dd3a795424f2879a4492a51a7d90f07"
)


def test_discovery_cleartext(capsys: Any) -> None:
"""Test the receipt of cleartext discovery packets."""
addr = ("127.0.0.1", 65535)
packet = "0" * 20 + DUMMY_JSON + "0" * 8
TuyaDiscovery().datagram_received(packet, addr)

captured = capsys.readouterr()
assert captured.out == (addr[0] + " " + str(DUMMY_DATA) + "\n")


@pytest.mark.xfail(reason="encrypted discovery is broken") # type: ignore
def test_discovery_encrypted(capsys: Any) -> None:
"""Test the receipt of encrypted discovery packets."""
addr = ("127.0.0.1", 65535)
packet = b"0" * 20 + ENCRYPTED_DUMMY_JSON + b"0" * 8
TuyaDiscovery().datagram_received(packet, addr)

captured = capsys.readouterr()
assert captured.out == (addr[0] + " " + str(DUMMY_DATA) + "\n")


@pytest.mark.xfail(reason="code ignores all errors") # type: ignore
def test_discovery_invalid(capsys: Any) -> None:
"""Test the receipt of invalid discovery packets."""
addr = ("127.0.0.1", 65535)
for packet in [b"", b"\x80", b"0" * 28, b"0" * 20 + b"garbage" + b"0" * 8]:
TuyaDiscovery().datagram_received(packet, addr)
captured = capsys.readouterr()
assert captured.out != (addr[0] + " \n")
79 changes: 79 additions & 0 deletions tests/test_mqtt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python3
"""Test the smarthack.mqtt module."""

# Copyright (c) 2020 Faidon Liambotis
# SPDX-License-Identifier: MIT

import datetime
import time
from typing import Any

import paho.mqtt.publish # type: ignore

import pytest # type: ignore

from smarthack.mqtt import iot_dec, iot_enc, main # type: ignore


FIXED_TIME = datetime.datetime(2020, 1, 1, 0, 0, 0, 0, datetime.timezone.utc)
KEY = "k" * 16 # random key
PRECOMPUTED_21 = (
("", b"2.1f60982e36ae45003ET+owGzMcMRnHMCdHyZ81Q=="),
("foobar", b"2.14fe6da8264c3ffb2+C3QVjwIa4QPbzfpz7qFGw=="),
)
PRECOMPUTED_22 = (
("", b"2.2\x8cH\xf6\xc983680000\x11?\xa8\xc0l\xccp\xc4g\x1c\xc0\x9d\x1f&|\xd5"),
("foobar", b"2.2(\xb6hg83680000\xf8-\xd0V<\x08k\x84\x0fo7\xe9\xcf\xba\x85\x1b"),
)
# pylint: disable=bad-continuation
PRECOMPUTED_MESSAGE = {
"2.1": b"2.1a41802d787391e8eMT2UIXuzcBLQqUSz7cH+YEh2N+GD8x4hS2lCWbUP1cruE"
+ b"lURyMXmlDLL+s6k3C2L9HUSfLcAzFe6ymn6Bos18E67cERGFvN3ZyOQAMPDFT0=",
"2.2": b"2.2\xbd\xbc\x6e\xac\x38\x33\x36\x38\x30\x30\x30\x30\x31\x3d"
+ b"\x94\x21\x7b\xb3\x70\x12\xd0\xa9\x44\xb3\xed\xc1\xfe\x60\x48\x76"
+ b"\x37\xe1\x83\xf3\x1e\x21\x4b\x69\x42\x59\xb5\x0f\xd5\xca\xee\x12"
+ b"\x55\x11\xc8\xc5\xe6\x94\x32\xcb\xfa\xce\xa4\xdc\x2d\x8b\xff\xd8"
+ b"\x92\xa3\x41\x58\x22\x74\x1b\xbc\xe6\x81\x67\x0e\x99\x44\x29\xc7"
+ b"\xaa\xc9\x85\x3e\x40\xa6\x93\x28\x20\x2c\x84\x7f\xb1\xca",
}


def test_wire_format_v21() -> None:
"""Test the iot_enc() method against a set of precomputed values (v2.1)."""
for clear, encoded in PRECOMPUTED_21:
assert iot_enc(clear, KEY, "2.1") == encoded


def test_wire_format_v22(monkeypatch: Any) -> None:
"""Test the iot_enc() method against a set of precomputed values (v2.2)."""
with monkeypatch.context() as mpatch:
mpatch.setattr(time, "time", FIXED_TIME.timestamp)
for clear, encoded in PRECOMPUTED_22:
assert iot_enc(clear, KEY, "2.2") == encoded


@pytest.mark.xfail(reason="currently broken due to str/bytes bugs") # type: ignore
def test_wire_unformat() -> None:
"""Test the iot_dec() method against a set of precomputed values."""
for clear, encoded in PRECOMPUTED_21:
assert iot_dec(encoded, KEY) == clear


def test_prepared_message(monkeypatch: Any, capsys: Any) -> None:
"""Test the prepared message against a set of precomputed values."""
device_id = "0123456789"
local_key = "abcdefghijklmnop"

# ugly hack: mock paho's publish, and call main(), as that's the only way
# right now we can get the prepared (JSON-formatted/encrypted) message
# pylint: disable=unused-argument
def fake_single(topic: str, payload: bytes, hostname: str) -> None:
pass

with monkeypatch.context() as mpatch:
mpatch.setattr(time, "time", FIXED_TIME.timestamp)
mpatch.setattr(paho.mqtt.publish, "single", fake_single)
for protocol, message in PRECOMPUTED_MESSAGE.items():
main(["mqtt", "-i", device_id, "-l", local_key, "-p", protocol])
captured = capsys.readouterr()
assert captured.out.splitlines()[1] == repr(message)
32 changes: 32 additions & 0 deletions tests/test_psk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env python3
"""Test the smarthack.pskproxy module."""

# Copyright (c) 2020 Faidon Liambotis
# SPDX-License-Identifier: MIT

import pytest # type: ignore

from smarthack.pskproxy import IDENTITY_PREFIX, gen_psk


PREFIX = b"\x01" + IDENTITY_PREFIX
HINT = b"1dHRsc2NjbHltbGx3eWh5" b"0000000000000000" # taken from the code being tested
PRECOMPUTED = (
("", "310ab75a8d067ccea734482eb07adf04"),
("00" * 16, "3100cad5df5a8f719407b55e0ff55e883c53ab665efe02bd1bb93d7eda9e15e2"),
("80" * 16, "36ac9fe6f25bb46b652eeb78c9dbf2c6675f82bf574808da316295cab811efc5"),
(
"fef0f7f7d1fff1602f64cbdb482d86dc5f35949857a88ba085f9c17b7cae02ed95",
"5578f55f9e82e8ab7ea053cbacc7a3f4cc1cfd51ac202a447c5e3155bf347078",
),
)


def test_gen_psk() -> None:
"""Test the gen_psk() method against precomputed data."""
for clear, encrypted in PRECOMPUTED:
identity = PREFIX + bytes.fromhex(clear)
assert gen_psk(identity, HINT).hex() == encrypted

with pytest.raises(ValueError):
assert gen_psk(b"not 16 chars", HINT)
Loading