Skip to content

Commit

Permalink
Add cpctl
Browse files Browse the repository at this point in the history
  • Loading branch information
blavka committed Sep 18, 2018
1 parent c4482a0 commit a9f0bfb
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 0 deletions.
Empty file added cpctl/__init__.py
Empty file.
65 changes: 65 additions & 0 deletions cpctl/at.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import sys
import time
import serial

class ATException(Exception):
'''Generic communication error exception.'''
pass

class AT:
'''Class AT communication with device.'''

def __init__(self, device):
'''Open device and flush buffers.'''
exclusive = False if os.name == 'nt' or sys.platform == 'win32' else True
self._device = device
self._ser = None

def _connect(self):
if self._ser:
return
try:
self._ser = serial.Serial(self._device, baudrate=115200, timeout=3)
except Exception as e:
raise ATException(str(e))

time.sleep(0.1)
self._ser.reset_input_buffer()
self._ser.reset_output_buffer()
self._ser.write(b'\x1b')

def _read_line(self):
while True:
line = self._ser.readline()
if not line:
raise ATException('Communication error occurred!')
line = line.decode('ascii')

# print("line:", repr(line))
if line[0] == '{':
continue
if line[0] == '#':
continue
return line.strip()

def _read_response(self):
response = []
while True:
line = self._read_line()
if line == 'OK':
return response
elif line == 'ERROR':
raise ATException('Received ERROR')
else:
response.append(line)

def command(self, command):
command = 'AT' + command + '\r\n'
self._connect()
self._ser.write(command.encode('ascii'))
return self._read_response()

161 changes: 161 additions & 0 deletions cpctl/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''CLI module.'''

import datetime
import os
import sys
import time
import click
import re
from .at import AT, ATException

class CliException(Exception):
'''Generic cli error exception.'''
pass


def get_ports():
if os.name == 'nt' or sys.platform == 'win32':
from serial.tools.list_ports_windows import comports
elif os.name == 'posix':
from serial.tools.list_ports_posix import comports
return sorted(comports())


def select_device(ctx):
if 'at' not in ctx.obj:
ports = get_ports()
for i, port in enumerate(ports):
click.echo("%i %s" % (i, port[0]), err=True)
d = click.prompt('Please enter device')
for port in ports:
if port[0] == d:
device = port[0]
break
else:
try:
device = ports[int(d)][0]
except Exception as e:
raise CliException("Unknown device")

ctx.obj['at'] = AT(device)


@click.group()
@click.version_option()
@click.option('--device', '-d', type=str, help='Device path.')
@click.pass_context
def cli(ctx, device=None):
'''Cooper Control Tool.'''
if device:
ctx.obj['at'] = AT(device)


@cli.command()
def devices():
'''Print available devices.'''
for port in get_ports():
click.echo(port[0], err=True)


@cli.group()
@click.pass_context
def node(ctx):
'''Manage the nodes'''
select_device(ctx)


@node.command("list")
@click.pass_context
def node_list(ctx):
'''List attached nodes'''
node_list = ctx.obj['at'].command("$LIST")

if node_list:
for serial in node_list:
click.echo(serial)
else:
click.echo("Empty list")


@node.command("attach")
@click.argument('serial')
@click.pass_context
def node_attach(ctx, serial):
'''Attach node'''
if not re.match("^[0-9]{16}$", serial):
raise CliException("serial bad format")

node_list = ctx.obj['at'].command("$LIST")

if serial in node_list:
raise CliException("Node is in node list")

ctx.obj['at'].command("$ATTACH=" + serial)
ctx.obj['at'].command("&W")

click.echo('OK')


@node.command("detach")
@click.argument('serial')
@click.pass_context
def node_detach(ctx, serial):
'''Detach node'''
if not re.match("^[0-9]{16}$", serial):
raise CliException("serial bad format")

node_list = ctx.obj['at'].command("$LIST")

if not serial in node_list:
raise CliException("Node is not in node list")

ctx.obj['at'].command("$DETACH=" + serial)
ctx.obj['at'].command("&W")
click.echo('OK')


@node.command("purge")
@click.pass_context
def node_purge(ctx):
'''Detach all nodes'''
ctx.obj['at'].command("$PURGE")
ctx.obj['at'].command("&W")
click.echo('OK')


@cli.group()
@click.pass_context
def config(ctx):
'''Config'''
select_device(ctx)


@config.command('channel')
@click.option('--set', type=int, help='New cahnnel')
@click.pass_context
def config_channel(ctx, set=None):
'''Set channel'''
if set != None:
if set < 0 or set > 20:
raise CliException("Bad channel range")

ctx.obj['at'].command("$CHANNEL=%d" % set)
ctx.obj['at'].command("&W")

click.echo(ctx.obj['at'].command("$CHANNEL?")[0][1:])


def main():
'''Application entry point.'''
try:
cli(obj={}),
except ATException as e:
click.echo(str(e), err=True)
sys.exit(1)
except CliException as e:
click.echo(str(e), err=True)
sys.exit(1)
except KeyboardInterrupt:
pass
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
click==6.7
pyserial==3.4
30 changes: 30 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from setuptools import setup

setup(
name='cpctl',
packages=['cpctl'],
version='1.3.0',
description='Cooper Control Tool',
url='https://github.com/blavka/cpctl',
author='Karel Blavka',
author_email='[email protected]',
license='MIT',
keywords = ['cooper', 'cli', 'tool'],
classifiers=[
'Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 3.6',
'Topic :: Scientific/Engineering :: Human Machine Interfaces',
'Environment :: Console',
'Intended Audience :: Science/Research'
],
install_requires=[
'click==6.7', 'pyserial==3.4'
],
entry_points='''
[console_scripts]
cpctl=cpctl.cli:main
'''
)

0 comments on commit a9f0bfb

Please sign in to comment.