diff --git a/PythonCLI/gree.py b/PythonCLI/gree.py index 4742962..777bfa8 100644 --- a/PythonCLI/gree.py +++ b/PythonCLI/gree.py @@ -2,6 +2,7 @@ import base64 import sys +from Crypto.Cipher import AES from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend import json @@ -9,6 +10,10 @@ GENERIC_KEY = "a3K8Bx%2r8Y7#xDh" +ENCRYPTION_TYPE = 'ECB' +GENERIC_GCM_KEY = "{yxAHAY_Lm6pbC/<" +GCM_IV = b'\x54\x40\x78\x44\x49\x67\x5a\x51\x6c\x5e\x63\x13' +GCM_ADD = b'qualcomm-test' class ScanResult: @@ -38,7 +43,13 @@ def send_data(ip, port, data): def create_request(tcid, pack_encrypted, i=0): - return '{"cid":"app","i":' + str(i) + ',"t":"pack","uid":0,"tcid":"' + tcid + '","pack":"' + pack_encrypted + '"}' + request = '{"cid":"app","i":' + str(i) + ',"t":"pack","uid":0,"tcid":"' + tcid + '",' + if (isinstance(pack_encrypted, dict)): + request += '"tag":"' + pack_encrypted["tag"] + '","pack":"' + pack_encrypted["pack"] + '"}' + else: + request += '"pack":"' + pack_encrypted + '"}' + + return request def create_status_request_pack(tcid): @@ -80,6 +91,40 @@ def encrypt_generic(pack): return encrypt(pack, GENERIC_KEY) +def create_CGM_cipher(key): + cipher = AES.new(bytes(key, 'utf-8'), AES.MODE_GCM, nonce=GCM_IV) + cipher.update(GCM_ADD) + return cipher + + +def decrypt_GCM(pack_encoded, tag_encoded, key): + cipher = create_CGM_cipher(key) + base64decodedPack = base64.b64decode(pack_encoded) + base64decodedTag = base64.b64decode(tag_encoded) + decryptedPack = cipher.decrypt_and_verify(base64decodedPack, base64decodedTag) + decodedPack = decryptedPack.replace(b'\xff', b'').decode('utf-8') + return decodedPack + + +def decrypt_GCM_generic(pack_encoded, tag_encoded): + return decrypt_GCM(pack_encoded, tag_encoded, GENERIC_GCM_KEY) + + +def encrypt_GCM(pack, key): + encrypted_data, tag = create_CGM_cipher(key).encrypt_and_digest(pack.encode("utf-8")) + encrypted_pack = base64.b64encode(encrypted_data).decode('utf-8') + tag = base64.b64encode(tag).decode('utf-8') + data = { + "pack": encrypted_pack, + "tag": tag + } + return data + + +def encrypt_GCM_generic(pack): + return encrypt_GCM(pack, GENERIC_GCM_KEY) + + def search_devices(): print('Searching for devices using broadcast address: %s' % args.broadcast) @@ -106,7 +151,13 @@ def search_devices(): print(f'search_devices: data={data}, raw_json={raw_json}') resp = json.loads(raw_json) - pack = json.loads(decrypt_generic(resp['pack'])) + + if ENCRYPTION_TYPE == 'GCM': + decrypted_pack = decrypt_GCM_generic(resp['pack'], resp['tag']) + else: + decrypted_pack = decrypt_generic(resp['pack']) + + pack = json.loads(decrypted_pack) cid = pack['cid'] if 'cid' in pack and len(pack['cid']) > 0 else \ resp['cid'] if 'cid' in resp else '' @@ -129,16 +180,21 @@ def bind_device(search_result): print('Binding device: %s (%s, ID: %s)' % (search_result.ip, search_result.name, search_result.id)) pack = '{"mac":"%s","t":"bind","uid":0}' % search_result.id - pack_encrypted = encrypt_generic(pack) + if ENCRYPTION_TYPE == 'GCM': + pack_encrypted = encrypt_GCM_generic(pack) + else: + pack_encrypted = encrypt_generic(pack) request = create_request(search_result.id, pack_encrypted, 1) result = send_data(search_result.ip, 7000, bytes(request, encoding='utf-8')) response = json.loads(result) if response["t"] == "pack": - pack = response["pack"] - pack_decrypted = decrypt_generic(pack) + if ENCRYPTION_TYPE == 'GCM': + pack_decrypted = decrypt_GCM_generic(response["pack"], response["tag"]) + else: + pack_decrypted = decrypt_generic(response["pack"]) bind_resp = json.loads(pack_decrypted) @@ -156,10 +212,13 @@ def get_param(): cols = ','.join(f'"{i}"' for i in args.params) pack = f'{{"cols":[{cols}],"mac":"{args.id}","t":"status"}}' - pack_encrypted = encrypt(pack, args.key) - - request = '{"cid":"app","i":0,"pack":"%s","t":"pack","tcid":"%s","uid":0}' \ - % (pack_encrypted, args.id) + request = '{"cid":"app","i":0,"t":"pack","uid":0,"tcid":"%s",' % args.id + if ENCRYPTION_TYPE == 'GCM': + data_encrypted = encrypt_GCM(pack, args.key) + request += '"tag":"%s","pack":"%s"}' % (data_encrypted["tag"], data_encrypted["pack"]) + else: + pack_encrypted = encrypt(pack, args.key) + request += '"pack":"%s"}' % pack_encrypted result = send_data(args.client, 7000, bytes(request, encoding='utf-8')) @@ -171,7 +230,12 @@ def get_param(): if response["t"] == "pack": pack = response["pack"] - pack_decrypted = decrypt(pack, args.key) + if ENCRYPTION_TYPE == 'GCM': + tag = response["tag"] + pack_decrypted = decrypt_GCM(pack, tag, args.key) + else: + pack_decrypted = decrypt(pack, args.key) + pack_json = json.loads(pack_decrypted) if args.verbose: @@ -196,10 +260,15 @@ def set_param(): pack = f'{{"opt":[{opts}],"p":[{ps}],"t":"cmd"}}' print(pack) - pack_encrypted = encrypt(pack, args.key) - request = '{"cid":"app","i":0,"pack":"%s","t":"pack","tcid":"%s","uid":0}' \ - % (pack_encrypted, args.id) + request = '{"cid":"app","i":0,"t":"pack","tcid":"%s","uid":0,' % args.id + + if ENCRYPTION_TYPE == 'GCM': + data_encrypted = encrypt_GCM(pack, args.key) + request += '"tag":"%s","pack":"%s"}' % (data_encrypted["tag"], data_encrypted["pack"]) + else: + pack_encrypted = encrypt(pack, args.key) + request += '"pack":"%s"}' % pack_encrypted result = send_data(args.client, 7000, bytes(request, encoding='utf-8')) @@ -211,7 +280,12 @@ def set_param(): if response["t"] == "pack": pack = response["pack"] - pack_decrypted = decrypt(pack, args.key) + if ENCRYPTION_TYPE == 'GCM': + tag = response["tag"] + pack_decrypted = decrypt_GCM(pack, tag, args.key) + else: + pack_decrypted = decrypt(pack, args.key) + pack_json = json.loads(pack_decrypted) if args.verbose: @@ -228,8 +302,9 @@ def set_param(): parser.add_argument('command', help='You can use the following commands: search, get, set') parser.add_argument('-c', '--client', help='IP address of the client device') parser.add_argument('-b', '--broadcast', help='Broadcast IP address of the network the devices connecting to') - parser.add_argument('-i', '--id', help='Unique ID of the device') + parser.add_argument('-i', '--id', help='Unique ID of the device (mac address)') parser.add_argument('-k', '--key', help='Unique encryption key of the device') + parser.add_argument('-e', '--encryption', help='Set the encryption type AES128 used: ECB(default), GCM') parser.add_argument('--verbose', help='Enable verbose logging', action='store_true') if sys.platform == 'linux': parser.add_argument('--socket-interface', help='Bind the socket to a specific network interface') @@ -237,6 +312,11 @@ def set_param(): args = parser.parse_args() + if args.encryption is None: + ENCRYPTION_TYPE = 'ECB' + else: + ENCRYPTION_TYPE = args.encryption + command = args.command.lower() if command == 'search': if args.broadcast is None: