Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate APRS Passcode and send it if the option is turned on #4

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 86 additions & 58 deletions ambient_aprs/ambient_aprs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@


class AmbientAPRS:
send_id = 'Ambient APRS'
send_id = "Ambient APRS"

station_id = None

server_host = 'cwop.aprs.net'
server_host = "cwop.aprs.net"
server_port = 14580
address = None

Expand All @@ -22,24 +22,29 @@ class AmbientAPRS:
packet_data = None

def __init__(self, **kwargs):
self.send_id = kwargs.get('send_id', 'Ambient APRS')
self.send_id = kwargs.get("send_id", "Ambient APRS")

self.station_id = kwargs.get('station_id', None)
self.server_host = kwargs.get('host', 'cwop.aprs.net')
self.server_port = kwargs.get('port', 14580)
self.station_id = kwargs.get("station_id", None)
self.server_host = kwargs.get("host", "cwop.aprs.net")
self.server_port = kwargs.get("port", 14580)

self.latitude = kwargs.get('latitude', None)
self.longitude = kwargs.get('longitude', None)
self.latitude = kwargs.get("latitude", None)
self.longitude = kwargs.get("longitude", None)

if self.latitude and self.longitude:
self.position = '%s/%s_' % (self.convert_latitude(self.latitude), self.convert_longitude(self.longitude))
self.position = "%s/%s_" % (self.convert_latitude(self.latitude), self.convert_longitude(self.longitude))

if self.station_id:
self.address = '%s>APRS,TCPIP*:' % self.station_id
if kwargs.get("use_passcode", False):
passcode = self.aprs_passcode()
self.address = "%s-%i>APRS,TCPIP*:" % (self.station_id, passcode)

else:
self.address = "%s>APRS,TCPIP*:" % self.station_id

def log_message(self, message):
log_date = datetime.now().isoformat()[0:19]
logmessage = '%s: %s' % (log_date, message)
logmessage = "%s: %s" % (log_date, message)

logger.info(logmessage)

Expand All @@ -50,11 +55,11 @@ def decdeg2dms(self, degrees_decimal):
degrees, minutes = divmod(minutes, 60)
degrees = degrees if is_positive else -degrees

degrees = str(int(degrees)).zfill(2).replace('-', '0')
minutes = str(int(minutes)).zfill(2).replace('-', '0')
seconds = str(int(round(seconds * .01, 2) * 100)).zfill(2)
degrees = str(int(degrees)).zfill(2).replace("-", "0")
minutes = str(int(minutes)).zfill(2).replace("-", "0")
seconds = str(int(round(seconds * 0.01, 2) * 100)).zfill(2)

return {'degrees': degrees, 'minutes': minutes, 'seconds': seconds}
return {"degrees": degrees, "minutes": minutes, "seconds": seconds}

def decdeg2dmm_m(self, degrees_decimal):
is_positive = degrees_decimal >= 0
Expand All @@ -63,36 +68,36 @@ def decdeg2dmm_m(self, degrees_decimal):
degrees, minutes = divmod(minutes, 60)
degrees = degrees if is_positive else -degrees

degrees = str(int(degrees)).zfill(2).replace('-', '0')
degrees = str(int(degrees)).zfill(2).replace("-", "0")
minutes = str(round(minutes + (seconds / 60), 2)).zfill(5)

return {'degrees': degrees, 'minutes': minutes}
return {"degrees": degrees, "minutes": minutes}

def convert_latitude(self, degrees_decimal):
det = self.decdeg2dmm_m(degrees_decimal)
if degrees_decimal > 0:
direction = 'N'
direction = "N"
else:
direction = 'S'
direction = "S"

degrees = det.get('degrees')
minutes = det.get('minutes')
degrees = det.get("degrees")
minutes = det.get("minutes")

lat = '%s%s%s' % (degrees, str(minutes), direction)
lat = "%s%s%s" % (degrees, str(minutes), direction)

return lat

def convert_longitude(self, degrees_decimal):
det = self.decdeg2dmm_m(degrees_decimal)
if degrees_decimal > 0:
direction = 'E'
direction = "E"
else:
direction = 'W'
direction = "W"

degrees = det.get('degrees')
minutes = det.get('minutes')
degrees = det.get("degrees")
minutes = det.get("minutes")

lon = '%s%s%s' % (degrees, str(minutes), direction)
lon = "%s%s%s" % (degrees, str(minutes), direction)

return lon

Expand All @@ -111,31 +116,28 @@ def str_or_dots(self, number, length):
retn_value = number

if not number:
retn_value = '.' * length
retn_value = "." * length

else:
format_type = {
'int': 'd',
'float': '.0f',
}[type(number).__name__]
format_type = {"int": "d", "float": ".0f"}[type(number).__name__]

retn_value = ''.join(('%0', str(length), format_type)) % number
retn_value = "".join(("%0", str(length), format_type)) % number

return retn_value

def make_aprs_wx(self, **kwargs):
wind_dir = kwargs.get('wind_dir', None)
wind_speed = kwargs.get('wind_speed', None)
wind_gust = kwargs.get('wind_gust', None)
temperature = kwargs.get('temperature', None)
rain_last_hr = kwargs.get('rain_last_hr', None)
rain_last_24_hrs = kwargs.get('rain_last_24_hrs', None)
rain_since_midnight = kwargs.get('rain_since_midnight', None)
humidity = kwargs.get('humidity', None)
pressure = kwargs.get('pressure', None)
wind_dir = kwargs.get("wind_dir", None)
wind_speed = kwargs.get("wind_speed", None)
wind_gust = kwargs.get("wind_gust", None)
temperature = kwargs.get("temperature", None)
rain_last_hr = kwargs.get("rain_last_hr", None)
rain_last_24_hrs = kwargs.get("rain_last_24_hrs", None)
rain_since_midnight = kwargs.get("rain_since_midnight", None)
humidity = kwargs.get("humidity", None)
pressure = kwargs.get("pressure", None)

# Assemble the weather data of the APRS packet
return '%s/%sg%st%sr%sp%sP%sh%sb%s' % (
return "%s/%sg%st%sr%sp%sP%sh%sb%s" % (
self.str_or_dots(wind_dir, 3),
self.str_or_dots(wind_speed, 3),
self.str_or_dots(wind_gust, 3),
Expand All @@ -148,7 +150,7 @@ def make_aprs_wx(self, **kwargs):
)

def get_weather_data(self, **kwargs):
weather = kwargs.get('weather_data', False)
weather = kwargs.get("weather_data", False)

if not weather:
amb_api = AmbientAPI()
Expand All @@ -160,29 +162,55 @@ def get_weather_data(self, **kwargs):
if weather and isinstance(weather, dict):
# Prepare the data, which will be sent
self.wx_data = self.make_aprs_wx(
wind_dir=weather.get('winddir'),
wind_speed=float(weather.get('windspeedmph')),
wind_gust=float(weather.get('windgustmph')),
temperature=weather.get('tempf'),
rain_last_hr=weather.get('hourlyrainin'),
wind_dir=weather.get("winddir"),
wind_speed=float(weather.get("windspeedmph")),
wind_gust=float(weather.get("windgustmph")),
temperature=weather.get("tempf"),
rain_last_hr=weather.get("hourlyrainin"),
rain_last_24_hrs=None,
rain_since_midnight=weather.get('dailyrainin'),
humidity=weather.get('humidity'),
rain_since_midnight=weather.get("dailyrainin"),
humidity=weather.get("humidity"),
# Attention, barometric pressure in tenths of millibars/tenths of hPascal!
pressure=self.hg_to_mbar(weather.get('baromabsin'))
pressure=self.hg_to_mbar(weather.get("baromabsin")),
)

return self.wx_data

def aprs_passcode(self, callsign=False):
if not callsign:
callsign = self.station_id

if "-" in callsign:
callsign = callsign.split("-")[0]

if len(callsign) > 10:
callsign = callsign[0:10]

callsign = callsign.upper()

# initialize hash
hash = 0x73E2

call_chunks = list(zip(*[iter(callsign)] * 2))

for call_chunk in call_chunks:
hash ^= ord(call_chunk[0]) << 8
hash ^= ord(call_chunk[1])

retn = int(hash & 0x7FFF)

return retn

def build_packet(self):
if self.address and self.position and self.wx_data:
utc_datetime = datetime.now()
self.packet_data = '%s@%sz%s%s%s' % (
self.packet_data = "%s@%sz%s%s%s" % (
self.address,
utc_datetime.strftime("%d%H%M"),
self.position,
self.wx_data,
self.send_id)
self.send_id,
)

return self.packet_data

Expand All @@ -193,13 +221,13 @@ def send_packet(self, packet=None):
sSock = socket(AF_INET, SOCK_STREAM)
sSock.connect((self.server_host, self.server_port))
# Log on
login = 'user %s pass -1 vers Python\n' % self.station_id
sSock.send(login.encode('utf-8'))
login = "user %s pass -1 vers Python\n" % self.station_id
sSock.send(login.encode("utf-8"))
# Send packet
if packet:
sSock.send(packet.encode('utf-8'))
sSock.send(packet.encode("utf-8"))
elif self.packet_data:
sSock.send(self.packet_data.encode('utf-8'))
sSock.send(self.packet_data.encode("utf-8"))
# Close socket, must be closed to avoid buffer overflow
sSock.shutdown(0)
sSock.close()
Expand Down
4 changes: 2 additions & 2 deletions build_release.sh
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
python3 setup.py sdist bdist_wheel
python setup.py sdist bdist_wheel
# Test PyPi
#twine upload --repository-url https://test.pypi.org/legacy/ dist/*
# Prod PyPi
twine upload dist/*
twine upload dist/*
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='ambient_aprs',
version='0.11.0',
version='0.11.2',
packages=['ambient_aprs'],
url='https://github.com/avryhof/ambient_aprs',
license='MIT',
Expand Down
2 changes: 2 additions & 0 deletions test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from ambient_aprs.ambient_aprs import AmbientAPRS

aa = AmbientAPRS(
host='rotate.aprs.net',
use_passcode=True,
station_id='KD2OTL',
latitude=43.131258,
longitude=-76.155028
Expand Down