Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for devices using AES-GCM encryotion #64

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 95 additions & 15 deletions PythonCLI/gree.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
import base64
import sys

from Crypto.Cipher import AES
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import json
import socket


GENERIC_KEY = "a3K8Bx%2r8Y7#xDh"
ENCRYPTION_TYPE = 'ECB'
GENERIC_GCM_KEY = "{yxAHAY_Lm6pbC/<"
GCM_IV = b'\x54\x40\x78\x44\x49\x67\x5a\x51\x6c\x5e\x63\x13'
GCM_ADD = b'qualcomm-test'


class ScanResult:
Expand Down Expand Up @@ -38,7 +43,13 @@ def send_data(ip, port, data):


def create_request(tcid, pack_encrypted, i=0):
return '{"cid":"app","i":' + str(i) + ',"t":"pack","uid":0,"tcid":"' + tcid + '","pack":"' + pack_encrypted + '"}'
request = '{"cid":"app","i":' + str(i) + ',"t":"pack","uid":0,"tcid":"' + tcid + '",'
if (isinstance(pack_encrypted, dict)):
request += '"tag":"' + pack_encrypted["tag"] + '","pack":"' + pack_encrypted["pack"] + '"}'
else:
request += '"pack":"' + pack_encrypted + '"}'

return request


def create_status_request_pack(tcid):
Expand Down Expand Up @@ -80,6 +91,40 @@ def encrypt_generic(pack):
return encrypt(pack, GENERIC_KEY)


def create_CGM_cipher(key):
cipher = AES.new(bytes(key, 'utf-8'), AES.MODE_GCM, nonce=GCM_IV)
cipher.update(GCM_ADD)
return cipher


def decrypt_GCM(pack_encoded, tag_encoded, key):
cipher = create_CGM_cipher(key)
base64decodedPack = base64.b64decode(pack_encoded)
base64decodedTag = base64.b64decode(tag_encoded)
decryptedPack = cipher.decrypt_and_verify(base64decodedPack, base64decodedTag)
decodedPack = decryptedPack.replace(b'\xff', b'').decode('utf-8')
return decodedPack


def decrypt_GCM_generic(pack_encoded, tag_encoded):
return decrypt_GCM(pack_encoded, tag_encoded, GENERIC_GCM_KEY)


def encrypt_GCM(pack, key):
encrypted_data, tag = create_CGM_cipher(key).encrypt_and_digest(pack.encode("utf-8"))
encrypted_pack = base64.b64encode(encrypted_data).decode('utf-8')
tag = base64.b64encode(tag).decode('utf-8')
data = {
"pack": encrypted_pack,
"tag": tag
}
return data


def encrypt_GCM_generic(pack):
return encrypt_GCM(pack, GENERIC_GCM_KEY)


def search_devices():
print('Searching for devices using broadcast address: %s' % args.broadcast)

Expand All @@ -106,7 +151,13 @@ def search_devices():
print(f'search_devices: data={data}, raw_json={raw_json}')

resp = json.loads(raw_json)
pack = json.loads(decrypt_generic(resp['pack']))

if ENCRYPTION_TYPE == 'GCM':
decrypted_pack = decrypt_GCM_generic(resp['pack'], resp['tag'])
else:
decrypted_pack = decrypt_generic(resp['pack'])

pack = json.loads(decrypted_pack)

cid = pack['cid'] if 'cid' in pack and len(pack['cid']) > 0 else \
resp['cid'] if 'cid' in resp else '<unknown-cid>'
Expand All @@ -129,16 +180,21 @@ def bind_device(search_result):
print('Binding device: %s (%s, ID: %s)' % (search_result.ip, search_result.name, search_result.id))

pack = '{"mac":"%s","t":"bind","uid":0}' % search_result.id
pack_encrypted = encrypt_generic(pack)
if ENCRYPTION_TYPE == 'GCM':
pack_encrypted = encrypt_GCM_generic(pack)
else:
pack_encrypted = encrypt_generic(pack)

request = create_request(search_result.id, pack_encrypted, 1)
result = send_data(search_result.ip, 7000, bytes(request, encoding='utf-8'))

response = json.loads(result)
if response["t"] == "pack":
pack = response["pack"]

pack_decrypted = decrypt_generic(pack)
if ENCRYPTION_TYPE == 'GCM':
pack_decrypted = decrypt_GCM_generic(response["pack"], response["tag"])
else:
pack_decrypted = decrypt_generic(response["pack"])

bind_resp = json.loads(pack_decrypted)

Expand All @@ -156,10 +212,13 @@ def get_param():
cols = ','.join(f'"{i}"' for i in args.params)

pack = f'{{"cols":[{cols}],"mac":"{args.id}","t":"status"}}'
pack_encrypted = encrypt(pack, args.key)

request = '{"cid":"app","i":0,"pack":"%s","t":"pack","tcid":"%s","uid":0}' \
% (pack_encrypted, args.id)
request = '{"cid":"app","i":0,"t":"pack","uid":0,"tcid":"%s",' % args.id
if ENCRYPTION_TYPE == 'GCM':
data_encrypted = encrypt_GCM(pack, args.key)
request += '"tag":"%s","pack":"%s"}' % (data_encrypted["tag"], data_encrypted["pack"])
else:
pack_encrypted = encrypt(pack, args.key)
request += '"pack":"%s"}' % pack_encrypted

result = send_data(args.client, 7000, bytes(request, encoding='utf-8'))

Expand All @@ -171,7 +230,12 @@ def get_param():
if response["t"] == "pack":
pack = response["pack"]

pack_decrypted = decrypt(pack, args.key)
if ENCRYPTION_TYPE == 'GCM':
tag = response["tag"]
pack_decrypted = decrypt_GCM(pack, tag, args.key)
else:
pack_decrypted = decrypt(pack, args.key)

pack_json = json.loads(pack_decrypted)

if args.verbose:
Expand All @@ -196,10 +260,15 @@ def set_param():

pack = f'{{"opt":[{opts}],"p":[{ps}],"t":"cmd"}}'
print(pack)
pack_encrypted = encrypt(pack, args.key)

request = '{"cid":"app","i":0,"pack":"%s","t":"pack","tcid":"%s","uid":0}' \
% (pack_encrypted, args.id)
request = '{"cid":"app","i":0,"t":"pack","tcid":"%s","uid":0,' % args.id

if ENCRYPTION_TYPE == 'GCM':
data_encrypted = encrypt_GCM(pack, args.key)
request += '"tag":"%s","pack":"%s"}' % (data_encrypted["tag"], data_encrypted["pack"])
else:
pack_encrypted = encrypt(pack, args.key)
request += '"pack":"%s"}' % pack_encrypted

result = send_data(args.client, 7000, bytes(request, encoding='utf-8'))

Expand All @@ -211,7 +280,12 @@ def set_param():
if response["t"] == "pack":
pack = response["pack"]

pack_decrypted = decrypt(pack, args.key)
if ENCRYPTION_TYPE == 'GCM':
tag = response["tag"]
pack_decrypted = decrypt_GCM(pack, tag, args.key)
else:
pack_decrypted = decrypt(pack, args.key)

pack_json = json.loads(pack_decrypted)

if args.verbose:
Expand All @@ -228,15 +302,21 @@ def set_param():
parser.add_argument('command', help='You can use the following commands: search, get, set')
parser.add_argument('-c', '--client', help='IP address of the client device')
parser.add_argument('-b', '--broadcast', help='Broadcast IP address of the network the devices connecting to')
parser.add_argument('-i', '--id', help='Unique ID of the device')
parser.add_argument('-i', '--id', help='Unique ID of the device (mac address)')
parser.add_argument('-k', '--key', help='Unique encryption key of the device')
parser.add_argument('-e', '--encryption', help='Set the encryption type AES128 used: ECB(default), GCM')
parser.add_argument('--verbose', help='Enable verbose logging', action='store_true')
if sys.platform == 'linux':
parser.add_argument('--socket-interface', help='Bind the socket to a specific network interface')
parser.add_argument('params', nargs='*', default=None, type=str)

args = parser.parse_args()

if args.encryption is None:
ENCRYPTION_TYPE = 'ECB'
else:
ENCRYPTION_TYPE = args.encryption

command = args.command.lower()
if command == 'search':
if args.broadcast is None:
Expand Down