from gi.repository import GLib # Bluezero modules from bluezero import adapter from bluezero import peripheral from bluezero import device from bluezero.localGATT import Characteristic import os import signal import socket import threading from time import sleep # constants UART_SERVICE = '6E400001-B5A3-F393-E0A9-E50E24DCCA9E' RX_CHARACTERISTIC = '6E400002-B5A3-F393-E0A9-E50E24DCCA9E' TX_CHARACTERISTIC = '6E400003-B5A3-F393-E0A9-E50E24DCCA9E' original_sigterm_handler = signal.getsignal(signal.SIGTERM) class UARTDevice: tx_obj: Characteristic = None dev_obj: device.Device = None s_obj: socket.socket = None t_obj: threading.Thread = None mtu: int = 20 @classmethod def reader_thread(cls): try: while cls.s_obj: chunk = cls.s_obj.recv(cls.mtu) if len(chunk) == 0: break else: max_n_tries = 10 n_tries = 0 while cls.s_obj: n_tries += 1 if n_tries > max_n_tries: raise Exception("maximum write retries exceeded") try: cls.update_tx(chunk) break except Exception as e: print(e) sleep(3) except Exception as e: print(e) cls.cleanup() @classmethod def cleanup(cls): print('thread cleanup') if cls.s_obj: cls.dispose_channels() if cls.dev_obj: cls.dev_obj.disconnect() @classmethod def dispose_channels(cls): cls.tx_obj = None if cls.t_obj: if cls.t_obj is threading.current_thread() == False: cls.t_obj.join() cls.t_obj = None if cls.s_obj: s = cls.s_obj cls.s_obj = None s.close() @classmethod def on_connect(cls, ble_device: device.Device): print("Connected to " + str(ble_device.address)) if cls.dev_obj is None: print("cls.dev_obj was None") cls.dev_obj = ble_device cls.dev_addr = ble_device.address cls.mtu = 20 else: print("cls.dev_obj was not None!") ble_device.disconnect() @classmethod def on_disconnect(cls, adapter_address, device_address): print("Disconnected from " + device_address) if cls.dev_obj and cls.dev_addr == device_address: cls.dispose_channels() cls.dev_obj = None cls.dev_addr = None @classmethod def uart_notify(cls, notifying: bool, characteristic: Characteristic): if notifying: if cls.dev_obj: cls.tx_obj = characteristic cls.s_obj = socket.socket(socket.AF_INET, socket.SOCK_STREAM) while True: try: cls.s_obj.connect(("127.0.0.1", 1883)) break except Exception as e: print(e) sleep(3) cls.t_obj = threading.Thread(target=cls.reader_thread) cls.t_obj.start() else: # client sets notify to false before disconnecting cls.dispose_channels() @classmethod def update_tx(cls, value): if cls.tx_obj: # print('Received', repr(value)) # print("Sending") cls.tx_obj.set_value(value) @classmethod def uart_write(cls, value, options): # iOS - 2 (examined with python) # Android - 3 (examined with Qt) data_len = len(value) if data_len > cls.mtu: cls.mtu = data_len # print('central data length:', len(value)) # print('With options:', options) # print(repr(value)) # print('Text value:', bytes(value).decode('utf-8')) # print(repr(bytes(value))) # print(bytes(value)) if cls.s_obj: cls.update_tx(value) @classmethod def graceful_exit(cls, _1, _2): cls.cleanup() signal.signal(signal.SIGTERM, original_sigterm_handler) os.kill(os.getpid(), signal.SIGTERM) @classmethod def graceful_disconnect(cls, _1, _2): cls.cleanup() def main(adapter_address): ble_uart = peripheral.Peripheral(adapter_address, local_name='TopHealth') ble_uart.add_service(srv_id=1, uuid=UART_SERVICE, primary=True) ble_uart.add_characteristic(srv_id=1, chr_id=1, uuid=RX_CHARACTERISTIC, value=[], notifying=False, flags=['write', 'write-without-response'], write_callback=UARTDevice.uart_write, read_callback=None, notify_callback=None) ble_uart.add_characteristic(srv_id=1, chr_id=2, uuid=TX_CHARACTERISTIC, value=[], notifying=False, flags=['notify'], notify_callback=UARTDevice.uart_notify, read_callback=None, write_callback=None) ble_uart.on_connect = UARTDevice.on_connect ble_uart.on_disconnect = UARTDevice.on_disconnect signal.signal(signal.SIGTERM, UARTDevice.graceful_exit) signal.signal(signal.SIGTSTP, UARTDevice.graceful_disconnect) ble_uart.publish() if __name__ == '__main__': main(list(adapter.Adapter.available())[0].address)