Skip to content

Commit

Permalink
Fix bug with AddAdvertising command
Browse files Browse the repository at this point in the history
  • Loading branch information
ukBaz committed Jun 10, 2024
1 parent cb32eb1 commit 22a6c62
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 2 deletions.
15 changes: 13 additions & 2 deletions btsocket/btmgmt_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@ def encode(self, value, width):
self.value = value


class HexStr(DataField):

def decode(self, data):
self.value = data.hex()
self.octets = data

def encode(self, value, width):
self.octets = bytes.fromhex(value)
self.value = value


class CmdCode(DataField):

def decode(self, data):
Expand Down Expand Up @@ -694,8 +705,8 @@ def __repr__(self):
Parameter(name='timeout', width=2),
Parameter(name='adv_data_len', width=1),
Parameter(name='scan_rsp_len', width=1),
Parameter(name='adv_data', width=None),
Parameter(name='scan_rsp', width=None)]),
Parameter(name='adv_data', width=None, repeat=1, bt_type='HexStr'),
Parameter(name='scan_rsp', width=None, repeat=1, bt_type='HexStr')]),
0x003f: Packet([Parameter(name='instance', width=1)]),
0x0040: Packet([Parameter(name='instance', width=1),
Parameter(name='flags', width=4)]),
Expand Down
90 changes: 90 additions & 0 deletions examples/create_advertisement_btmgmt_socket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import asyncio
import enum
import sys

from btsocket import btmgmt_socket
from btsocket import btmgmt_protocol


class Flags(enum.IntEnum):
CONNECTABLE = enum.auto()
GENERAL_DISCOVERABLE = enum.auto()
LIMITED_DISCOVERABLE = enum.auto()
FLAGS_IN_ADV_DATA = enum.auto()
TX_IN_ADV_DATA = enum.auto()
APPEARANCE_IN_ADV_DATA = enum.auto()
LOCAL_NAME_IN_ADV_DATA = enum.auto()
PHY_LE_1M = enum.auto()
PHY_LE_2M = enum.auto()
PHY_LE_CODED = enum.auto()


def little_bytes(value, size_of):
return int(value).to_bytes(size_of, byteorder='little')


def advert_command(instance_id, flags, duration, timeout, adv_data, scan_rsp):
cmd = little_bytes(0x003e, 2)
ctrl_idx = little_bytes(0x00, 2)
instance = little_bytes(instance_id, 1) # (1 Octet)
flags = little_bytes(flags, 4) # (4 Octets)
duration = little_bytes(duration, 2) # (2 Octets)
timeout = little_bytes(timeout, 2) # (2 Octets)
adv_data = bytes.fromhex(adv_data) # (0-255 Octets)
adv_data_len = little_bytes(len(adv_data), 1) # (1 Octet)
scan_rsp = bytes.fromhex(scan_rsp) # (0-255 Octets)
scan_rsp_len = little_bytes(len(scan_rsp), 1) # (1 Octet)
params = instance + flags + duration + timeout + adv_data_len + scan_rsp_len + adv_data + scan_rsp
param_len = little_bytes(len(params), 2)

return cmd + ctrl_idx + param_len + params


def test_asyncio_usage():
sock = btmgmt_socket.open()

if sys.version_info < (3, 10):
loop = asyncio.get_event_loop()
else:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()

asyncio.set_event_loop(loop)

def reader():
raw_data = sock.recv(100)
data = btmgmt_protocol.reader(raw_data)
print("Received:", data.event_frame.command_opcode, data.event_frame.status)

# We are done: unregister the file descriptor
loop.remove_reader(sock)

# Stop the event loop
loop.stop()

# Register the file descriptor for read event
loop.add_reader(sock, reader)

# Write a command to the socket
loop.call_soon(sock.send, advert_command(
instance_id=1,
flags=Flags.GENERAL_DISCOVERABLE,
duration=0x00, # zero means use default
timeout=0x00, # zero means use default
adv_data='1bfff0ff6DB643CF7E8F471188665938D17AAA26495E131415161718',
scan_rsp='',
))

try:
# Run the event loop
loop.run_forever()
finally:
# We are done. Close sockets and the event loop.
btmgmt_socket.close(sock)
loop.close()


if __name__ == '__main__':
test_asyncio_usage()
59 changes: 59 additions & 0 deletions examples/create_advertisement_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import enum
import logging

from btsocket import btmgmt_callback
from btsocket import btmgmt_protocol


# logger = logging.getLogger("btsocket.btmgmt_callback")
# logger.setLevel(logging.DEBUG)
logger_prot = logging.getLogger('btsocket.btmgmt_protocol')
logger_prot.setLevel(logging.INFO)


class Flags(enum.IntEnum):
CONNECTABLE = enum.auto()
GENERAL_DISCOVERABLE = enum.auto()
LIMITED_DISCOVERABLE = enum.auto()
FLAGS_IN_ADV_DATA = enum.auto()
TX_IN_ADV_DATA = enum.auto()
APPEARANCE_IN_ADV_DATA = enum.auto()
LOCAL_NAME_IN_ADV_DATA = enum.auto()
PHY_LE_1M = enum.auto()
PHY_LE_2M = enum.auto()
PHY_LE_CODED = enum.auto()


ctrl_idx = 0 # hci0
instance = 1 # Arbitrary value
flags = Flags.GENERAL_DISCOVERABLE
duration = 0x00 # 0 means use default
timeout = 0x0 # 0 means use default
adv_data = "1bfff0ff6DB643CF7E8F471188665938D17AAA26495E131415161718" # (0-255 Octets)
adv_data_len = len(bytes.fromhex(adv_data))
scan_rsp = "" # (0-255 Octets)
scan_rsp_len = len(bytes.fromhex(scan_rsp))


def show_result(pkt, mgmt_class):
print(pkt)


def main():
mgmt = btmgmt_callback.Mgmt()
mgmt.add_event_callback(btmgmt_protocol.Events.CommandCompleteEvent, show_result)
mgmt.send('AddAdvertising', ctrl_idx,
instance,
flags,
duration,
timeout,
adv_data_len,
scan_rsp_len,
adv_data,
scan_rsp)

mgmt.start()


if __name__ == '__main__':
main()
61 changes: 61 additions & 0 deletions examples/create_advertisement_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import enum
import logging
from btsocket import btmgmt_sync


logger = logging.getLogger("btsocket.btmgmt_protocol")
logger.setLevel(logging.INFO)


class Flags(enum.IntEnum):
CONNECTABLE = enum.auto()
GENERAL_DISCOVERABLE = enum.auto()
LIMITED_DISCOVERABLE = enum.auto()
FLAGS_IN_ADV_DATA = enum.auto()
TX_IN_ADV_DATA = enum.auto()
APPEARANCE_IN_ADV_DATA = enum.auto()
LOCAL_NAME_IN_ADV_DATA = enum.auto()
PHY_LE_1M = enum.auto()
PHY_LE_2M = enum.auto()
PHY_LE_CODED = enum.auto()


ctrl_idx = 0 # hci0
instance = 1 # Arbitrary value
flags = Flags.GENERAL_DISCOVERABLE
duration = 0x00 # 0 means use default
timeout = 0x0 # 0 means use default
adv_data = "1bfff0ff6DB643CF7E8F471188665938D17AAA26495E131415161718" # (0-255 Octets)
adv_data_len = len(bytes.fromhex(adv_data))
scan_rsp = "" # (0-255 Octets)
scan_rsp_len = len(bytes.fromhex(scan_rsp))


def show_result(data):
print(data.event_frame.command_opcode, data.event_frame.status)


show_result(
btmgmt_sync.send(
"AddAdvertising",
ctrl_idx,
instance,
flags,
duration,
timeout,
adv_data_len,
scan_rsp_len,
adv_data,
scan_rsp,
)
)

input("Press <enter> key to end advertisement")

show_result(
btmgmt_sync.send(
"RemoveAdvertising",
ctrl_idx,
instance,
)
)
9 changes: 9 additions & 0 deletions tests/test_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ def test_power_on_cmd(self):
self.assertEqual(expected,
pkt.header.octets + pkt.cmd_params_frame.octets)

def test_add_adv(self):
expected = bytes.fromhex('3e00000027000102000000000000001c001bfff0ff6db643cf7e8f4711886659'
'38d17aaa26495e131415161718')
pkt = btmgmt_protocol.command('AddAdvertising', 0,
1, 2, 0, 0, 0x1c, 0,
"1bfff0ff6DB643CF7E8F471188665938D17AAA26495E131415161718", '')
self.assertEqual(expected,
pkt.header.octets + pkt.cmd_params_frame.octets)


if __name__ == '__main__':
unittest.main()

0 comments on commit 22a6c62

Please sign in to comment.