Skip to content

Commit

Permalink
Support for devices using AES-GCM encryotion
Browse files Browse the repository at this point in the history
New devices are using AES-GCM encryotion for communication
More details about the problem in tomikaa87#52
  • Loading branch information
lovery committed Jun 18, 2024
1 parent 4f086af commit 96bf516
Showing 1 changed file with 95 additions and 15 deletions.
110 changes: 95 additions & 15 deletions PythonCLI/gree.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
import base64
import sys

from Crypto.Cipher import AES
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import json
import socket


GENERIC_KEY = "a3K8Bx%2r8Y7#xDh"
ENCRYPTION_TYPE = 'ECB'
GENERIC_GCM_KEY = "{yxAHAY_Lm6pbC/<"
GCM_IV = b'\x54\x40\x78\x44\x49\x67\x5a\x51\x6c\x5e\x63\x13'
GCM_ADD = b'qualcomm-test'


class ScanResult:
Expand Down Expand Up @@ -38,7 +43,13 @@ def send_data(ip, port, data):


def create_request(tcid, pack_encrypted, i=0):
return '{"cid":"app","i":' + str(i) + ',"t":"pack","uid":0,"tcid":"' + tcid + '","pack":"' + pack_encrypted + '"}'
request = '{"cid":"app","i":' + str(i) + ',"t":"pack","uid":0,"tcid":"' + tcid + '",'
if (isinstance(pack_encrypted, dict)):
request += '"tag":"' + pack_encrypted["tag"] + '","pack":"' + pack_encrypted["pack"] + '"}'
else:
request += '"pack":"' + pack_encrypted + '"}'

return request


def create_status_request_pack(tcid):
Expand Down Expand Up @@ -80,6 +91,40 @@ def encrypt_generic(pack):
return encrypt(pack, GENERIC_KEY)


def create_CGM_cipher(key):
cipher = AES.new(bytes(key, 'utf-8'), AES.MODE_GCM, nonce=GCM_IV)
cipher.update(GCM_ADD)
return cipher


def decrypt_GCM(pack_encoded, tag_encoded, key):
cipher = create_CGM_cipher(key)
base64decodedPack = base64.b64decode(pack_encoded)
base64decodedTag = base64.b64decode(tag_encoded)
decryptedPack = cipher.decrypt_and_verify(base64decodedPack, base64decodedTag)
decodedPack = decryptedPack.replace(b'\xff', b'').decode('utf-8')
return decodedPack


def decrypt_GCM_generic(pack_encoded, tag_encoded):
return decrypt_GCM(pack_encoded, tag_encoded, GENERIC_GCM_KEY)


def encrypt_GCM(pack, key):
encrypted_data, tag = create_CGM_cipher(key).encrypt_and_digest(pack.encode("utf-8"))
encrypted_pack = base64.b64encode(encrypted_data).decode('utf-8')
tag = base64.b64encode(tag).decode('utf-8')
data = {
"pack": encrypted_pack,
"tag": tag
}
return data


def encrypt_GCM_generic(pack):
return encrypt_GCM(pack, GENERIC_GCM_KEY)


def search_devices():
print('Searching for devices using broadcast address: %s' % args.broadcast)

Expand All @@ -106,7 +151,13 @@ def search_devices():
print(f'search_devices: data={data}, raw_json={raw_json}')

resp = json.loads(raw_json)
pack = json.loads(decrypt_generic(resp['pack']))

if ENCRYPTION_TYPE == 'GCM':
decrypted_pack = decrypt_GCM_generic(resp['pack'], resp['tag'])
else:
decrypted_pack = decrypt_generic(resp['pack'])

pack = json.loads(decrypted_pack)

cid = pack['cid'] if 'cid' in pack and len(pack['cid']) > 0 else \
resp['cid'] if 'cid' in resp else '<unknown-cid>'
Expand All @@ -129,16 +180,21 @@ def bind_device(search_result):
print('Binding device: %s (%s, ID: %s)' % (search_result.ip, search_result.name, search_result.id))

pack = '{"mac":"%s","t":"bind","uid":0}' % search_result.id
pack_encrypted = encrypt_generic(pack)
if ENCRYPTION_TYPE == 'GCM':
pack_encrypted = encrypt_GCM_generic(pack)
else:
pack_encrypted = encrypt_generic(pack)

request = create_request(search_result.id, pack_encrypted, 1)
result = send_data(search_result.ip, 7000, bytes(request, encoding='utf-8'))

response = json.loads(result)
if response["t"] == "pack":
pack = response["pack"]

pack_decrypted = decrypt_generic(pack)
if ENCRYPTION_TYPE == 'GCM':
pack_decrypted = decrypt_GCM_generic(response["pack"], response["tag"])
else:
pack_decrypted = decrypt_generic(response["pack"])

bind_resp = json.loads(pack_decrypted)

Expand All @@ -156,10 +212,13 @@ def get_param():
cols = ','.join(f'"{i}"' for i in args.params)

pack = f'{{"cols":[{cols}],"mac":"{args.id}","t":"status"}}'
pack_encrypted = encrypt(pack, args.key)

request = '{"cid":"app","i":0,"pack":"%s","t":"pack","tcid":"%s","uid":0}' \
% (pack_encrypted, args.id)
request = '{"cid":"app","i":0,"t":"pack","uid":0,"tcid":"%s",' % args.id
if ENCRYPTION_TYPE == 'GCM':
data_encrypted = encrypt_GCM(pack, args.key)
request += '"tag":"%s","pack":"%s"}' % (data_encrypted["tag"], data_encrypted["pack"])
else:
pack_encrypted = encrypt(pack, args.key)
request += '"pack":"%s"}' % pack_encrypted

result = send_data(args.client, 7000, bytes(request, encoding='utf-8'))

Expand All @@ -171,7 +230,12 @@ def get_param():
if response["t"] == "pack":
pack = response["pack"]

pack_decrypted = decrypt(pack, args.key)
if ENCRYPTION_TYPE == 'GCM':
tag = response["tag"]
pack_decrypted = decrypt_GCM(pack, tag, args.key)
else:
pack_decrypted = decrypt(pack, args.key)

pack_json = json.loads(pack_decrypted)

if args.verbose:
Expand All @@ -196,10 +260,15 @@ def set_param():

pack = f'{{"opt":[{opts}],"p":[{ps}],"t":"cmd"}}'
print(pack)
pack_encrypted = encrypt(pack, args.key)

request = '{"cid":"app","i":0,"pack":"%s","t":"pack","tcid":"%s","uid":0}' \
% (pack_encrypted, args.id)
request = '{"cid":"app","i":0,"t":"pack","tcid":"%s","uid":0,' % args.id

if ENCRYPTION_TYPE == 'GCM':
data_encrypted = encrypt_GCM(pack, args.key)
request += '"tag":"%s","pack":"%s"}' % (data_encrypted["tag"], data_encrypted["pack"])
else:
pack_encrypted = encrypt(pack, args.key)
request += '"pack":"%s"}' % pack_encrypted

result = send_data(args.client, 7000, bytes(request, encoding='utf-8'))

Expand All @@ -211,7 +280,12 @@ def set_param():
if response["t"] == "pack":
pack = response["pack"]

pack_decrypted = decrypt(pack, args.key)
if ENCRYPTION_TYPE == 'GCM':
tag = response["tag"]
pack_decrypted = decrypt_GCM(pack, tag, args.key)
else:
pack_decrypted = decrypt(pack, args.key)

pack_json = json.loads(pack_decrypted)

if args.verbose:
Expand All @@ -228,15 +302,21 @@ def set_param():
parser.add_argument('command', help='You can use the following commands: search, get, set')
parser.add_argument('-c', '--client', help='IP address of the client device')
parser.add_argument('-b', '--broadcast', help='Broadcast IP address of the network the devices connecting to')
parser.add_argument('-i', '--id', help='Unique ID of the device')
parser.add_argument('-i', '--id', help='Unique ID of the device (mac address)')
parser.add_argument('-k', '--key', help='Unique encryption key of the device')
parser.add_argument('-e', '--encryption', help='Set the encryption type AES128 used: ECB(default), GCM')
parser.add_argument('--verbose', help='Enable verbose logging', action='store_true')
if sys.platform == 'linux':
parser.add_argument('--socket-interface', help='Bind the socket to a specific network interface')
parser.add_argument('params', nargs='*', default=None, type=str)

args = parser.parse_args()

if args.encryption is None:
ENCRYPTION_TYPE = 'ECB'
else:
ENCRYPTION_TYPE = args.encryption

command = args.command.lower()
if command == 'search':
if args.broadcast is None:
Expand Down

0 comments on commit 96bf516

Please sign in to comment.