-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgovee_h5055_mqtt.py
152 lines (114 loc) · 6.16 KB
/
govee_h5055_mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/python
'''
This is a python Bluetooth advertisement scanner for the Govee brand Bluetooth BBQ
temperature sensor. Tested on model H5055 using Raspberry Pi 4.
Temperatures are published as MQTT messages.
Credit: I used information for Govee advertisement format from
github.com/Thrilleratplay/GoveeWatcher
github.com/tsaitsai/govee_bluetooth_gateway
Install dependencies:
sudo apt-get install python3-pip libglib2.0-dev
sudo pip3 install bluepy
sudo apt install -y mosquitto mosquitto-clients
sudo pip3 install paho-mqtt
Needs sudo to run on Raspbian
sudo python3 govee_h5055_mqtt.py
Run in background
sudo nohup python3 govee_h5055_mqtt.py &
'''
from __future__ import print_function
from time import gmtime, strftime, sleep
from bluepy.btle import Scanner, DefaultDelegate, BTLEException
import sys
import paho.mqtt.client as mqtt
import pickle
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
def on_message(client, userdata, msg):
print("on message")
client = mqtt.Client()
mqtt_prefix = "/sensor/govee"
mqtt_gateway_name = "/Mosquitto/"
debug = []
received_channels = []
class ScanDelegate(DefaultDelegate):
global client
# mqtt message topic/payload: /prefix/gateway_name/mac/
global mqtt_prefix
global mqtt_gateway_name
def handleDiscovery(self, dev, isNewDev, isNewData):
#if (dev.addr == "a4:c1:38:xx:xx:xx") or (dev.addr == "a4:c1:38:xx:xx:xx"):
if dev.addr[:8]=="a4:c1:38":
#returns a list, of which the [2] item of the [3] tupple is manufacturing data
adv_list = dev.getScanData()
#debug
if 0:
debug.append(adv_list)
with open('adv_list.pickle', 'wb') as handle:
pickle.dump(debug, handle, protocol=pickle.HIGHEST_PROTOCOL)
#print(isNewData)
#check if received data is payload
if (adv_list[0][2] == "05") & isNewData:
adv_manuf_data = adv_list[2][2]
print("manuf data = ", adv_manuf_data)
#read channel a measured value, low byte, high byte switched
sensor_a_meas = int(adv_manuf_data[16:18] + adv_manuf_data[14:16], 16)
sensor_a_low = int(adv_manuf_data[20:22] + adv_manuf_data[18:20], 16)
sensor_a_high = int(adv_manuf_data[24:26] + adv_manuf_data[22:24], 16)
#read channel b measured value, low byte, high byte switched
sensor_b_meas = int(adv_manuf_data[30:32] + adv_manuf_data[28:30], 16)
sensor_b_low = int(adv_manuf_data[34:36] + adv_manuf_data[32:34], 16)
sensor_b_high = int(adv_manuf_data[38:40] + adv_manuf_data[36:38], 16)
#Read Battery charge
bat_hex = adv_manuf_data[8:10]
bat_int = int(bat_hex,16)
#extract Payload Channel number
ch_hex = adv_manuf_data[10:12]
ch_int = int(ch_hex,16)
#keep only the first 2 bits
#ch_nr = ch_int & int(0b11000000)
#konvert to 8 bit string
ch_nr = format(ch_int, "08b")
#extract channel number from first 2 bits
channel = 2*int(ch_nr[0:2],2) + 1
if not (channel in received_channels):
publish_data = True
received_channels.append(channel)
print("publishing data")
else:
publish_data = False
print("already published")
#check in lower 6 bits, if channel is connected
#die beiden Bitmuster mit AND verknüpfen, und schauen, ob der Sensor connected ist
#z.B. Sensor 5 => 2**5 = 32 mit Bitmuster 10 0000
#nun schauen wir ob diese 1 sich im Bitmuster 1011 0011 wiederfindet (hier wären z.B. Sensor 1,2,5,6) verbunden)
ch_a_connected = ch_int & 2**(channel-1) > 0
ch_b_connected = ch_int & 2**(channel) > 0
mac=dev.addr
signal = dev.rssi
mqtt_topic = mqtt_prefix + mqtt_gateway_name + mac + "/"
#client.publish(mqtt_topic+"alarm", alarm, qos=0)
if publish_data:
client.publish(mqtt_topic+"batt", bat_int, qos=0)
if ch_a_connected:
#print("mac=", mac, " percent humidity ", hum_percent, " temp_F = ", temp_F, " battery percent=", battery_percent, " rssi=", signal)
mqtt_topic = mqtt_prefix + mqtt_gateway_name + mac + "/Channel_" + str(channel) + "/"
#client.publish(mqtt_topic+"rssi", signal, qos=0)
client.publish(mqtt_topic+"temp", sensor_a_meas, qos=0)
client.publish(mqtt_topic+"low", sensor_a_low, qos=0)
client.publish(mqtt_topic+"high", sensor_a_high, qos=0)
if ch_b_connected:
#client.publish(mqtt_topic+"battery_pct", battery_percent, qos=0)
mqtt_topic = mqtt_prefix + mqtt_gateway_name + mac + "/Channel_" + str(channel + 1) + "/"
#client.publish(mqtt_topic+"rssi", signal, qos=0)
client.publish(mqtt_topic+"temp", sensor_b_meas, qos=0)
client.publish(mqtt_topic+"low", sensor_b_low, qos=0)
client.publish(mqtt_topic+"high", sensor_b_high, qos=0)
sys.stdout.flush()
scanner = Scanner().withDelegate(ScanDelegate())
#replace localhost with your MQTT broker
client.connect("localhost",1883,60)
client.on_connect = on_connect
client.on_message = on_message
#while True:
scanner.scan(10.0, passive=True)