-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathbleServer.py
executable file
·168 lines (151 loc) · 6.68 KB
/
bleServer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/python
# File: bleServer.py
# Auth: P Srinivas Rao
# Desc: Bluetooth server application that uses RFCOMM sockets
import os
import sys
import time
import logging
import logging.config
import json # Uses JSON package
import cPickle as pickle # Serializing and de-serializing a Python object structure
from bluetooth import * # Python Bluetooth library
logger = logging.getLogger('bleServerLogger')
def startLogging(
default_path='configLogger.json',
default_level=logging.INFO,
env_key='LOG_CFG'
):
# Setup logging configuration
path = default_path
value = os.getenv(env_key, None)
if value:
path = value
if os.path.exists(path):
with open(path, 'rt') as f:
config = json.load(f)
logging.config.dictConfig(config)
else:
logging.basicConfig(level=default_level)
class bleServer:
def __init__(self, serverSocket=None, clientSocket=None):
if serverSocket is None:
self.dataObj = None
self.serverSocket = serverSocket
self.clientSocket = clientSocket
self.serviceName="BluetoothServices"
self.jsonFile ="text.json"
self.uuid = "94f39d29-7d6d-437d-973b-fba39e49d4ee"
else:
self.serverSocket = serverSocket
self.clientSocket = clientSocket
def getBluetoothSocket(self):
try:
self.serverSocket=BluetoothSocket( RFCOMM )
logger.info("Bluetooth server socket successfully created for RFCOMM service...")
except (BluetoothError, SystemExit, KeyboardInterrupt) as e:
logger.error("Failed to create the bluetooth server socket ", exc_info=True)
def getBluetoothConnection(self):
try:
self.serverSocket.bind(("",PORT_ANY))
logger.info("Bluetooth server socket bind successfully on host "" to PORT_ANY...")
except (Exception, BluetoothError, SystemExit, KeyboardInterrupt) as e:
logger.error("Failed to bind server socket on host to PORT_ANY ... ", exc_info=True)
try:
self.serverSocket.listen(1)
logger.info("Bluetooth server socket put to listening mode successfully ...")
except (Exception, BluetoothError, SystemExit, KeyboardInterrupt) as e:
logger.error("Failed to put server socket to listening mode ... ", exc_info=True)
try:
port=self.serverSocket.getsockname()[1]
logger.info("Waiting for connection on RFCOMM channel %d" % (port))
except (Exception, BluetoothError, SystemExit, KeyboardInterrupt) as e:
logger.error("Failed to get connection on RFCOMM channel ... ", exc_info=True)
def advertiseBluetoothService(self):
try:
advertise_service( self.serverSocket, self.serviceName,
service_id = self.uuid,
service_classes = [ self.uuid, SERIAL_PORT_CLASS ],
profiles = [ SERIAL_PORT_PROFILE ],
# protocols = [ OBEX_UUID ]
)
logger.info("%s advertised successfully ..." % (self.serviceName))
except (Exception, BluetoothError, SystemExit, KeyboardInterrupt) as e:
logger.error("Failed to advertise bluetooth services ... ", exc_info=True)
def acceptBluetoothConnection(self):
try:
self.clientSocket, clientInfo = self.serverSocket.accept()
logger.info("Accepted bluetooth connection from %s", clientInfo)
except (Exception, BluetoothError, SystemExit, KeyboardInterrupt) as e:
logger.error("Failed to accept bluetooth connection ... ", exc_info=True)
def recvData(self):
try:
while True:
data= self.clientSocket.recv(1024)
if not data:
self.clientSocket.send("EmptyBufferResend")
# remove the length bytes from the front of buffer
# leave any remaining bytes in the buffer!
dataSizeStr, ignored, data = data.partition(':')
dataSize = int(dataSizeStr)
if len(data) < dataSize:
self.clientSocket.send("CorruptedBufferResend")
else:
self.clientSocket.send("DataRecived")
break
logger.info("Data received successfully over bluetooth connection")
return data
except (Exception, IOError, BluetoothError) as e:
pass
def deserializedData(self, _dataRecv):
try:
self.dataObj=pickle.loads(_dataRecv)
logger.info("Serialized string converted successfully to object")
except (Exception, pickle.UnpicklingError) as e:
logger.error("Failed to de-serialized string ... ", exc_info=True)
def writeJsonFile(self):
try:
# Open a file for writing
jsonFileObj = open(self.jsonFile,"w")
logger.info("%s file successfully opened to %s" % (self.jsonFile, jsonFileObj))
# Save the dictionary into this file
# (the 'indent=4' is optional, but makes it more readable)
json.dump(self.dataObj,jsonFileObj, indent=4)
logger.info("Content dumped successfully to the %s file" %(self.jsonFile))
# Close the file
jsonFileObj.close()
logger.info("%s file successfully closed" %(self.jsonFile))
except (Exception, IOError) as e:
logger.error("Failed to write json contents to the file ... ", exc_info=True)
def closeBluetoothSocket(self):
try:
self.clientSocket.close()
self.serverSocket.close()
logger.info("Bluetooth sockets successfully closed ...")
except (Exception, BluetoothError) as e:
logger.error("Failed to close the bluetooth sockets ", exc_info=True)
def start(self):
# Create the server socket
self.getBluetoothSocket()
# get bluetooth connection to port # of the first available
self.getBluetoothConnection()
# advertising bluetooth services
self.advertiseBluetoothService()
# Accepting bluetooth connection
self.acceptBluetoothConnection()
def receive(self):
# receive data
dataRecv=self.recvData()
# de-serializing data
self.deserializedData(dataRecv)
# Writing json object to the file
self.writeJsonFile()
def stop(self):
# Disconnecting bluetooth sockets
self.closeBluetoothSocket()
if __name__ == '__main__':
startLogging()
bleSvr = bleServer()
bleSvr.start()
bleSvr.receive()
bleSvr.stop()