-
Notifications
You must be signed in to change notification settings - Fork 0
/
sattrack.py
783 lines (670 loc) · 32.4 KB
/
sattrack.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
"""
Author: Romain Fafet ([email protected])
"""
from numpy import cos, sin, arcsin, arctan2, einsum, sqrt, pi
from skyfield.api import load, Topos, Star, EarthSatellite
from skyfield.positionlib import Geocentric
from urllib.parse import urlparse
from skyfield.units import Angle
from functions import *
from indiclient import *
class SatTrack(object):
""" SatTrack is the core class of pysattrack
The configuration is performed by the GUI by changing the attributes.
INDI connection is controlled by connect() and disconnect() and the tracking
by start(), stop(), move(ra_angle,dec_angle) and goto_rise_and_wait().
"""
def __init__(self):
# configuration variables: they that can be assessed by the UI and saved
self.indi_server_ip = "127.0.0.1"
self.indi_port = 7624
self.indi_telescope_driver = ""
self.indi_joystick_driver = ""
self.catalogs = [
CatalogItem("Recent launches", "https://celestrak.com/NORAD/elements/tle-new.txt", True),
CatalogItem("Space stations", "https://celestrak.com/NORAD/elements/stations.txt", True),
CatalogItem("100 brightest", "https://celestrak.com/NORAD/elements/visual.txt", True),
CatalogItem("Active satellites", "https://celestrak.com/NORAD/elements/active.txt", True),
CatalogItem("Geosynchronous", "https://celestrak.com/NORAD/elements/geo.txt", True),
CatalogItem("Iridium", "https://celestrak.com/NORAD/elements/iridium.txt", True),
CatalogItem("Iridium NEXT", "https://celestrak.com/NORAD/elements/iridium-NEXT.txt", True),
CatalogItem("Starlink", "https://celestrak.com/NORAD/elements/starlink.txt", True),
CatalogItem("Globalstar", "https://celestrak.com/NORAD/elements/globalstar.txt", True),
CatalogItem("Intelsat", "https://celestrak.com/NORAD/elements/intelsat.txt", True),
CatalogItem("SES", "https://celestrak.com/NORAD/elements/ses.txt", True),
CatalogItem("Orbcomm", "https://celestrak.com/NORAD/elements/orbcomm.txt", True),
CatalogItem("Amateur Radio", "https://celestrak.com/NORAD/elements/amateur.txt", True)
]
self._selected_satellite = 'O3B FM12'
self._observer_alt = 5
#TAS
#self._observer_lat = "43.538 N"
#self._observer_lon = "6.955 E"
#Le Cannet
#self._observer_lat = "43.5652 N"
#self._observer_lon = "6.9676 E"
#Calern
self._observer_lat = "43.7530 N"
self._observer_lon = "6.9219 E"
self.observer_offset = 0 # in days
self.track_method = 0 # 0 = GOTO, 1 = Move, 2 = Timed moves, 3 = Speed
self.p_gain = 0.5
self.max_speed_ra = 1. # deg/s
self.max_speed_de = 1. # deg/s
self.joystick_speed = 1. # deg/s
self.i_sat = 0.5
self.connection_timeout = 1
self.joystick_mapping = None
self.joystick_deadband = 2000
self.joystick_expo = 0.
# dynamic data
self.ui = None
self.indiclient = IndiClient(self)
self.ts = load.timescale()
self.obs = Topos(self._observer_lat, self._observer_lon, None, None, self._observer_alt)
self.satellites_tle = dict()
self.update_tle()
planets = load('de421.bsp')
self.earth = planets['earth']
self.sun = planets['sun']
self.selected_satellite = self._selected_satellite
self.tracking = False
self.offset_joystick_speed_dec = 0. # deg/s
self.offset_joystick_speed_ra = 0.
self.offset_joystick_speed_FB = 0. # Front / Back
self.offset_joystick_speed_LR = 0. # Left / Right
self.offset_joystick_speed_time = 0. # s/s
self.offset_ui_speed_dec = 0. # deg/s
self.offset_ui_speed_ra = 0.
self.offset_ui_speed_FB = 0. # Front / Back
self.offset_ui_speed_LR = 0. # Left / Right
self.offset_ui_speed_time = 0. # s/s
self.t_offset_integration = None
self.offset_dec = 0.
self.offset_ra = 0.
self.offset_FB = 0.
self.offset_LR = 0.
self.offset_time = 0.
# Some properties to maintain to preserve internal consistency when attributes are updated and for error management
@property
def selected_satellite(self):
return self._selected_satellite
@selected_satellite.setter
def selected_satellite(self, n_sat):
# ensure that the satellite name is correct, manage the case of satellites whose key (name) is a number
try:
self.sat = self.satellites_tle[n_sat]
except:
try:
self.sat = self.satellites_tle[int(n_sat)]
except:
pass
else:
self._selected_satellite = int(n_sat)
else:
self._selected_satellite = n_sat
def set_tle(self, tle):
# set a custom TLE. If the format is incorrect, a ValueError exception is raised.
try:
lines = tle.strip().splitlines()
except IndexError:
raise ValueError("incorrect number of lines")
self.sat = EarthSatellite(lines[0], lines[1]) # also raise a ValueError
self._selected_satellite = "TLE"
@property
def observer_alt(self):
return self._observer_alt
@observer_alt.setter
def observer_alt(self, n_alt):
# A valueError will be raised by Topos in cas of incorrect arguments. In this case the error is transfered to
# the upper level without modifying_the property
self.obs = Topos(self._observer_lat, self._observer_lon, None, None, n_alt)
self._observer_alt = n_alt
@property
def observer_lat(self):
return self._observer_lat
@observer_lat.setter
def observer_lat(self, n_lat):
# A valueError will be raised by Topos in cas of incorrect arguments. In this case the error is transfered to
# the upper level without modifying_the property
self.obs = Topos(n_lat, self._observer_lon, None, None, self._observer_alt)
self._observer_lat = n_lat
@property
def observer_lon(self):
return self._observer_lon
@observer_lon.setter
def observer_lon(self, n_lon):
# A valueError will be raised by Topos in cas of incorrect arguments. In this case the error is transfered to
# the upper level without modifying_the property
self.obs = Topos(self._observer_lat, n_lon, None, None, self._observer_alt)
self._observer_lon = n_lon
# Utility functions
def sat_pos(self, t=None):
""" Satellite position: ra, dec, distance"""
diff = self.sat - self.obs
if t is None:
t = self.t()
topocentric = diff.at(t)
ra, dec, distance = topocentric.radec()
return ra, dec, distance
def illuminated(self, t=None):
""" return false if the satellite is in shadow """
if t is None:
t = self.t()
# projection of the earth center on the ray between the sun and the satellite
earth_sat_vect = self.sat.at(t).position.au
sun_sat_vect = (self.sat - (self.sun - self.earth)).at(t).position.au
product = scalar_product(earth_sat_vect, sun_sat_vect)
if product < 0: # The satellite is in front of the earth
return True
else: # The satellite is after the earth, let's check if the ray is underground
sun_sat_vect = sun_sat_vect / sqrt(scalar_product(sun_sat_vect, sun_sat_vect)) # normalize
earth_projection_vect = earth_sat_vect - scalar_product(earth_sat_vect, sun_sat_vect) * sun_sat_vect
earth_projection = Geocentric(position_au=earth_projection_vect, center=399, t=t)
return earth_projection.subpoint().elevation.m > 0.
def in_shadow(self, t=None):
return not self.illuminated(t)
def telescope_pos(self):
""" Telescope position: ra, dec """
if self.indiclient.telescope_features["minimal"]:
radec = self.indiclient.telescope.getNumber("EQUATORIAL_EOD_COORD")
star = Star(ra_hours=radec[0].value, dec_degrees=radec[1].value)
# alt, az = self.obs.at(self.t()).observe(star).apparent().altaz()
return star.ra, star.dec
else:
raise Error("Unable to get the coordinates from the telescope")
# noinspection PyProtectedMember
def radec2altaz(self, ra: float, dec: float, t=None):
"""
Convert ra,dec in alt,az
:param ra: ra in rad
:param dec: dec in rad
:param t: Skyfield Time object, use current time if omitted
:return: alt, az in rad
"""
if t is None:
t = self.t()
rot = self.obs._altaz_rotation(t)
cosdec = cos(dec)
radec = [cosdec * cos(ra), cosdec * sin(ra), sin(dec)]
altaz = einsum("ij,j->i", rot, radec)
alt = arcsin(altaz[2])
az = arctan2(altaz[1], altaz[0])
return alt, az
def radec2altaz_2(self, ra: Angle, dec: Angle, t=None):
"""
Convert ra,dec in alt,az
:param ra: ra as skyfield Angle object
:param dec: dec as skyfield Angle object
:param t: Skyfield Time object, use current time if omitted
:return: alt, az as skyfield Angle object
"""
alt, az = self.radec2altaz(ra.radians, dec.radians, t)
return Angle(radians=alt, preference="degrees"), Angle(radians=az, preference="degrees")
def altaz2radec(self, alt: float, az: float, t=None):
"""
Convert alt,az in ra,dec
:param alt: alt in rad
:param az: az in rad
:param t: Skyfield Time object, use current time if omitted
:return: ra, dec in rad
"""
if t is None:
t = self.t()
rot = self.obs._altaz_rotation(t)
cosalt = cos(alt)
altaz = [cosalt * cos(az), cosalt * sin(az), sin(alt)]
radec = einsum("ji,j->i", rot, altaz)
dec = arcsin(radec[2])
ra = arctan2(radec[1], radec[0])
ra = ra if ra >=0 else ra+2*pi
altaz_inv = einsum("ij,j->i", rot, radec)
cosdec = cos(dec)
radec = [cosdec * cos(ra), cosdec * sin(ra), sin(dec)]
altaz_2 = einsum("ij,j->i", rot, radec)
alt2 = arcsin(altaz_2[2])
az2 = arctan2(altaz_2[1], altaz_2[0])
return ra, dec
def altaz2radec_2(self, alt: Angle, az: Angle, t=None):
"""
Convert alt,az in ra,dec
:param alt: alt as skyfield Angle object
:param az: az as skyfield Angle object
:param t: Skyfield Time object, use current time if omitted
:return: ra, dec as skyfield Angle object
"""
ra, dec = self.altaz2radec(alt.radians, az.radians, t)
return Angle(radians=ra, preference="hours"), Angle(radians=dec, preference="degrees")
def t(self):
""" Current software time"""
tmp = self.ts.now().tt + self.observer_offset
return self.ts.tt(jd=tmp)
def set_time(self, year, month=1, day=1, hour=0, minute=0, second=0.0):
self.observer_offset = self.ts.utc(year, month, day, hour, minute, second).tt - self.ts.now().tt
def t_iso(self):
""" Current software time in iso format"""
tmp = self.ts.now().tt + self.observer_offset
return self.ts.tt(jd=tmp).utc_iso()
def log(self, level, text):
""" level: 0 for error, 1 for warning, 2 for common messages, 3 for extended logging """
if self.ui is not None:
if level == 0:
print("ERROR: " + text)
self.ui.log_browser.append("<font color=\"Red\">{0} - ERROR: {1}</font>"
.format(self.t_iso(), text))
elif level == 1:
print("WARNING: " + text)
self.ui.log_browser.append("<font color=\"Orange\">{0} - WARNING: {1}</font>"
.format(self.t_iso(), text))
elif level == 2:
print("Info: " + text)
self.ui.log_browser.append("<font color=\"Black\">{0} - Info: {1}</font>"
.format(self.t_iso(), text))
else:
pass
print("Debug: " + text)
# self.ui.log_browser.append("<font color=\"Blue\">{0} - Debug: {1}</font>"
# .format(self.t_iso(), text))
def update_tle(self, max_age=3):
""" Update satellite elements, only elements older than 'max_age' days are downloaded """
self.satellites_tle = dict()
for catalog in self.catalogs:
if catalog.active:
try:
current_tle = load.tle(catalog.url, filename=catalog.filename()) # try to get TLE from disk
age = self.t() - current_tle[list(current_tle)[0]].epoch
self.log(2, 'TLE for ' + catalog.name + ' are {:.3f} days old'.format(age))
force_update = abs(age) > max_age
except OSError as err:
self.log(1, "TLE file not found:\n" + str(err))
force_update = True
current_tle={}
if force_update:
try:
current_tle = load.tle(catalog.url, reload=True)
except OSError as err:
self.log(1, "Impossible to download TLE:\n" + str(err))
self.satellites_tle.update(current_tle)
if self.ui is not None:
self.ui.update_sat_list()
# next pass prediction
def next_pass(self, t0, backward=False):
"""
Predict the next pass. Return a tuple containing the dates and alt az angles of the rise, culmination, meridian crossing and set, or None if the respective event does no happen.
Return ((None,None, None, None, t0),(None,None, None, None),(None,None, None, None)) if no pass is found.
:param t0: time to start the search
:param backward: backward = True will search the previous pass
:return: ((t_rise, t_culmination, t_meridian, t_set, t0), (alt_rise, alt_culmination, alt_meridian, alt_set), (az_rise, az_culmination, az_meridian, az_set))
"""
t0 = t0.tt
def alt_t(dt):
tjd = t0 + dt / 86400
ra, dec, dist = self.sat_pos(t=self.ts.tt(jd=tjd))
alt, az = self.radec2altaz(ra.radians, dec.radians, t=self.ts.tt(jd=tjd))
return alt
def az_t(dt):
tjd = t0 + dt / 86400
ra, dec, dist = self.sat_pos(t=self.ts.tt(jd=tjd))
alt, az = self.radec2altaz(ra.radians, dec.radians, t=self.ts.tt(jd=tjd))
return az
direction = -1 if backward else 1
step = 60 * 20 # less than 1/4 of orbit
small_step = 30 # few deg
smaller_step = 1 # time resolution
trajectory_segments = 20
dt0 = 0
dt_rise = None
dt_set = None
dt_meridian = None
dt_culmination = None
while True:
# Ascending / descending orbit coarse detection
alt0 = alt_t(dt0)
alt1 = alt_t(dt0 + step * direction)
while alt1 < alt0 and dt0 * direction < 86400 and alt1 < 0:
self.log(3, "Phase 1: Decreasing at dt0={0}, alt0={1}, alt1={2}".format(dt0, alt0, alt1))
alt0 = alt1
dt0 = dt0 + step * direction
alt1 = alt_t(dt0 + step * direction)
while alt1 >= alt0 and dt0 * direction < 86400:
self.log(3, "Phase 1: Increasing at dt0={0}, alt0={1}, alt1={2}".format(dt0, alt0, alt1))
alt0 = alt1
dt0 = dt0 + step * direction
alt1 = alt_t(dt0 + step * direction)
if dt0 * direction > 86400 and alt1 < 0:
self.log(2, "no other pass for today")
return (None, None, None, None, t0), (None, None, None, None), (None, None, None, None)
self.log(3,
"Phase 1: Maximum between dt0={0} and {1}".format(dt0 - step * direction, dt0 + step * direction))
dt0 = dt0 - step * direction
alt0 = alt_t(dt0)
alt1 = alt_t(dt0 + small_step * direction)
# Refining the maximum altitude
while alt1 >= alt0 and alt1 < 10 and dt0 * direction < 86400:
self.log(3, "Phase 2: Increasing at dt0={0}, alt0={1}, alt1={2}".format(dt0, alt0, alt1))
alt0 = alt1
dt0 = dt0 + small_step * direction
alt1 = alt_t(dt0 + small_step * direction)
if alt1 < 0:
if dt0 * direction < 86400:
continue # restart the search for the following orbit
else:
self.log(2, "no pass found")
return (None, None, None, None, t0), (None, None, None, None), (None, None, None, None)
else:
# Refining a 2nd time the maximum altitude to get the date
alt1 = alt_t(dt0 + smaller_step * direction)
while alt1 >= alt0:
if dt0 * direction > 86400:
self.log(3, "no culmination today")
return (None, None, None, None, t0), (None, None, None, None), (None, None, None, None)
self.log(3, "Phase 3: Increasing at dt0={0}, alt0={1}, alt1={2}".format(dt0, alt0, alt1))
alt0 = alt1
dt0 = dt0 + smaller_step * direction
alt1 = alt_t(dt0 + smaller_step * direction)
dt_culmination = dt0
# refining rise and set date by dichotomy
# set
dt0 = dt_culmination
alt0 = alt_t(dt_culmination)
alt1 = alt_t(dt_culmination + step)
while alt1 >= 0 and dt0 < 86400:
self.log(3, "Phase 4 set: Increasing at dt0={0}, alt0={1}, alt1={2}".format(dt0, alt0, alt1))
alt0 = alt1
dt0 = dt0 + step
alt1 = alt_t(dt0 + step)
if dt0 > 86400:
self.log(3, "no set today")
else:
dt1 = dt0 + step
while dt1 - dt0 > smaller_step:
dt2 = (dt0 + dt1) / 2
alt2 = alt_t(dt2)
if alt2 > 0:
dt0 = dt2
alt0 = alt2
else:
dt1 = dt2
alt1 = alt2
self.log(3,
"Phase 4 set: Restrict set date to dt={0} alt={1} to dt={2} alt={3}".format(dt0, alt0,
dt1,
alt1))
dt_set = dt0
# rise
dt0 = dt_culmination
alt0 = alt_t(dt_culmination)
alt1 = alt_t(dt_culmination - step)
while alt1 >= 0 and dt0 > -86400:
self.log(3, "Phase 4 rise: Increasing at dt0={0}, alt0={1}, alt1={2}".format(dt0, alt0, alt1))
alt0 = alt1
dt0 = dt0 - step
alt1 = alt_t(dt0 - step)
if dt0 < -86400:
self.log(3, "no rise today")
else:
dt1 = dt0 - step
while dt0 - dt1 > smaller_step:
dt2 = (dt0 + dt1) / 2
alt2 = alt_t(dt2)
if alt2 > 0:
dt0 = dt2
alt0 = alt2
else:
dt1 = dt2
alt1 = alt2
self.log(3,
"Phase 4 rise: Restrict set date to dt={0} alt={1} to dt={2} alt={3}".format(dt0, alt0,
dt1,
alt1))
dt_rise = dt0
# meridian
if (dt_rise is not None) and (dt_set is not None):
dt0 = dt_rise
dt1 = dt0 + (dt_set - dt_rise) / trajectory_segments
az0 = az_t(dt0)
az1 = az_t(dt1)
while dt1 < dt_set and az0 * az1 > 0: # equivalent to sign(az0)==sign(az1)
dt0 = dt1
dt1 = dt0 + (dt_set - dt_rise) / trajectory_segments
az0 = az1
az1 = az_t(dt1)
self.log(3,
"Phase 5a: Segment dt={0} az={1} to dt={2} az={3}".format(dt0, az0, dt1, az1))
if dt1 >= dt_set:
self.log(3, "no meridian crossing")
else:
while dt1 - dt0 > smaller_step:
dt2 = (dt0 + dt1) / 2
az2 = az_t(dt2)
if az2 * az0 > 0:
dt0 = dt2
az0 = az2
else:
dt1 = dt2
az1 = az2
self.log(3,
"Phase 5b: Restrict meridian crossing date to dt={0} az={1} to dt={2} az={3}"
.format(dt0, az0, dt1, az1))
dt_meridian = dt0
if dt_rise is not None:
t_rise = self.ts.tt(jd=t0 + dt_rise / 86400)
alt_rise = Angle(radians=alt_t(dt_rise), preference="degrees")
az_rise = Angle(radians=az_t(dt_rise), preference="degrees")
else:
t_rise = None
alt_rise = None
az_rise = None
if dt_culmination is not None:
t_culmination = self.ts.tt(jd=t0 + dt_culmination / 86400)
alt_culmination = Angle(radians=alt_t(dt_culmination), preference="degrees")
az_culmination = Angle(radians=az_t(dt_culmination), preference="degrees")
else:
t_culmination = None
alt_culmination = None
az_culmination = None
if dt_meridian is not None:
t_meridian = self.ts.tt(jd=t0 + dt_meridian / 86400)
alt_meridian = Angle(radians=alt_t(dt_meridian), preference="degrees")
az_meridian = Angle(radians=az_t(dt_meridian), preference="degrees")
else:
t_meridian = None
alt_meridian = None
az_meridian = None
if dt_set is not None:
t_set = self.ts.tt(jd=t0 + dt_set / 86400)
alt_set = Angle(radians=alt_t(dt_set), preference="degrees")
az_set = Angle(radians=az_t(dt_set), preference="degrees")
else:
t_set = None
alt_set = None
az_set = None
self.log(2,
"pass found:\n"
" rise: dt={0} az={1}\n"
" meridian: dt={2} az={3} alt={4}\n"
" culmination: dt={5} az={6} alt={7}\n"
" set: dt={8} az={9}"
.format(dt_rise, az_rise,
dt_meridian, az_meridian, alt_meridian,
dt_culmination, az_culmination, alt_culmination,
dt_set, az_set))
return (
(t_rise, t_culmination, t_meridian, t_set, t0),
(alt_rise, alt_culmination, alt_meridian, alt_set),
(az_rise, az_culmination, az_meridian, az_set))
# INDI connection
def connect(self):
if self.is_connected():
self.log(1, "Already connected")
return None
self.log(2, "Connecting to " + self.indi_server_ip + ":" + str(self.indi_port))
self.indiclient.setServer(self.indi_server_ip, self.indi_port)
if not (self.indiclient.connectServer()):
self.log(0, "Connection error")
return None
if self.ui is not None:
self.ui.connected()
self.log(2, "Connected")
def disconnect(self):
if not self.is_connected():
self.log(1, "Already disconnected")
return None
if self.tracking:
self.stop_tracking()
if not (self.indiclient.disconnectServer()):
self.log(0, "Error during disconnection")
return None
if self.ui is not None:
self.ui.disconnected()
self.log(2, "Disconnection successful")
def is_connected(self):
return self.indiclient.isServerConnected()
def set_ui(self, ui):
self.ui = ui
# Tracking
def start_tracking(self):
if not self.indiclient.telescope_features["minimal"]:
self.log(0, "Tracking cannot be started: no telescope connected")
return
self.tracking = True
if self.ui is not None:
self.ui.tracking_started()
def stop_tracking(self):
if self.tracking:
self.tracking = False
self.indiclient.set_speed(0, 0)
if self.ui is not None:
self.ui.tracking_stopped()
def update_joystick_offset(self, nvp):
# this procedure is called by indiclient each time the joystick input are updated.
if self.joystick_mapping is None:
return
if len(nvp) != len(self.joystick_mapping):
self.log(1, "The joystick configuration does not correspond to the actual joystick. "
"Please reconfigure the joystick")
self.joystick_mapping = None
return
# integrate until current time
self.integrate_offset()
self.offset_joystick_speed_dec = 0 # deg/s
self.offset_joystick_speed_ra = 0
self.offset_joystick_speed_FB = 0 # Front / Back
self.offset_joystick_speed_LR = 0 # Left / Right
self.offset_joystick_speed_time = 0 # s/s
for i in range(len(self.joystick_mapping)):
# apply dead-band and expo
if nvp[i].value < -self.joystick_deadband:
tmp = nvp[i].value + self.joystick_deadband
elif nvp[i].value > self.joystick_deadband:
tmp = nvp[i].value - self.joystick_deadband
else:
continue
tmp = tmp / 32767.
tmp = tmp * abs(tmp) ** 3 * self.joystick_expo + tmp * (1 - self.joystick_expo)
# apply mapping and inversion
if self.joystick_mapping[i][0] == 1:
self.offset_joystick_speed_dec += tmp * self.joystick_speed * (1 if self.joystick_mapping[i][1] else -1)
elif self.joystick_mapping[i][0] == 2:
self.offset_joystick_speed_ra += tmp * self.joystick_speed * (1 if self.joystick_mapping[i][1] else -1)
elif self.joystick_mapping[i][0] == 3:
self.offset_joystick_speed_FB += tmp * self.joystick_speed * (1 if self.joystick_mapping[i][1] else -1)
elif self.joystick_mapping[i][0] == 4:
self.offset_joystick_speed_LR += tmp * self.joystick_speed * (1 if self.joystick_mapping[i][1] else -1)
elif self.joystick_mapping[i][0] == 5:
self.offset_joystick_speed_time += tmp * self.joystick_speed / 360 * 86400 * (
1 if self.joystick_mapping[i][1] else -1)
def update_ui_offset(self, north_south, east_west, front_back, left_right, future_past):
"""
this procedure is called by the UI each time a move button is pressed or released.
:param north_south: +1 for north, -1 for south, 0 for no move
:param east_west: ...
:param front_back: ...
:param left_right: ...
:param future_past: ...
"""
# integrate until current time
self.integrate_offset()
self.offset_ui_speed_dec = self.joystick_speed * north_south
self.offset_ui_speed_ra = self.joystick_speed * east_west
self.offset_ui_speed_FB = self.joystick_speed * front_back
self.offset_ui_speed_LR = self.joystick_speed * left_right
self.offset_ui_speed_time = self.joystick_speed / 360 * 86400 * future_past
def integrate_offset(self):
# It integrates the offsets that are then applied in update_tracking()
if self.t_offset_integration is None:
self.t_offset_integration = self.t()
self.offset_dec = 0.
self.offset_ra = 0.
self.offset_FB = 0.
self.offset_LR = 0.
self.offset_time = 0.
return
t = self.t()
t_1 = self.t_offset_integration
dt = (t - t_1) * 86400.
self.offset_dec += dt * (self.offset_joystick_speed_dec + self.offset_ui_speed_dec)
self.offset_ra += dt * (self.offset_joystick_speed_ra + self.offset_ui_speed_ra)
self.offset_FB += dt * (self.offset_joystick_speed_FB + self.offset_ui_speed_FB)
self.offset_LR += dt * (self.offset_joystick_speed_LR + self.offset_ui_speed_LR)
self.offset_time += dt * (self.offset_joystick_speed_time + self.offset_ui_speed_time)
self.t_offset_integration = t
# noinspection PyProtectedMember
def update_tracking(self, current_ra, current_dec):
# This procedure is called each time the telescope coordinates are updated
if not self.tracking:
return
# NOTE: all calculation are in deg
# target location and speed
target_ra, target_dec, alt = self.sat_pos()
target_ra_1, target_dec_1, alt = self.sat_pos(self.ts.tt(jd=self.t().tt + 1. / 86400.)) # at t + 1s
target_speed_ra = target_ra_1._degrees - target_ra._degrees
target_speed_dec = target_dec_1._degrees - target_dec._degrees
diff_ra = target_ra._degrees - current_ra * 15. + self.offset_ra
diff_dec = target_dec._degrees - current_dec + self.offset_dec
if diff_ra > 180:
diff_ra -= 360
if diff_ra < -180:
diff_ra += 360
if diff_dec > 180:
diff_dec -= 360
if diff_dec < -180:
diff_dec += 360
speed_ra = self.p_gain * -diff_ra -target_speed_ra + self.offset_joystick_speed_ra + 360./86164.
speed_dec = self.p_gain * diff_dec + target_speed_dec + self.offset_joystick_speed_dec
self.log(3,
"\ntime: {0}\ntarget:{1} / {2}\ncurrent: {3} / {4}\ndiff: {5} / {6}\ntarget speed: {7} / {8}\n"
"command speed: {9} / {10}\noffset: {11} / {12}\noffset rate {13} / {14} ".format(
self.t_iso(), target_ra, target_dec, Angle(hours=current_ra), Angle(degrees=current_dec),
Angle(degrees=diff_ra), Angle(degrees=diff_dec), target_speed_ra, target_speed_dec, speed_ra,
speed_dec, self.offset_ra, self.offset_dec, self.offset_joystick_speed_ra, self.offset_joystick_speed_dec))
# Clip to max speed
if speed_ra > abs(self.max_speed_ra):
speed_ra = abs(self.max_speed_ra)
if speed_dec > abs(self.max_speed_de):
speed_dec = abs(self.max_speed_de)
if speed_ra < -abs(self.max_speed_ra):
speed_ra = -abs(self.max_speed_ra)
if speed_dec < -abs(self.max_speed_de):
speed_dec = -abs(self.max_speed_de)
# inverse direction if negative max speed
if self.max_speed_ra < 0:
speed_ra = -speed_ra
if self.max_speed_de < 0:
speed_dec = -speed_dec
self.indiclient.set_speed(speed_ra, speed_dec)
self.integrate_offset()
def goto_altaz(self, alt: Angle, az: Angle):
ra,dec=self.altaz2radec_2(alt,az)
self.log(2,"Goto alt/az {0} / {1}, ra/dec {2} / {3}".format(str(alt),str(az),str(ra),str(dec)))
self.indiclient.goto(ra,dec)
class CatalogItem(object):
""" Satellite catalog item """
def __init__(self, name, url, active=False):
self.name = name
self.url = url
self.active = active
def filename(self):
return urlparse(self.url).path.split('/')[-1]
class Error(Exception):
pass