|
| 1 | +import sympy |
| 2 | +import numpy as np |
| 3 | +from typing import List |
| 4 | + |
| 5 | +from .constants import EARTH_ROTATION_RATE, SPEED_OF_LIGHT |
| 6 | +from .helpers import ConstellationId |
| 7 | +from .raw_gnss import GNSSMeasurement |
| 8 | + |
| 9 | + |
| 10 | +def gauss_newton(fun, b, xtol=1e-8, max_n=25): |
| 11 | + for _ in range(max_n): |
| 12 | + # Compute function and jacobian on current estimate |
| 13 | + r, J = fun(b) |
| 14 | + |
| 15 | + # Update estimate |
| 16 | + delta = np.linalg.pinv(J) @ r |
| 17 | + b -= delta |
| 18 | + |
| 19 | + # Check step size for stopping condition |
| 20 | + if np.linalg.norm(delta) < xtol: |
| 21 | + break |
| 22 | + return b |
| 23 | + |
| 24 | + |
| 25 | +def calc_pos_fix(measurements, posfix_functions=None, x0=None, no_weight=False, signal='C1C', min_measurements=6): |
| 26 | + ''' |
| 27 | + Calculates gps fix using gauss newton method |
| 28 | + To solve the problem a minimal of 4 measurements are required. |
| 29 | + If Glonass is included 5 are required to solve for the additional free variable. |
| 30 | + returns: |
| 31 | + 0 -> list with positions |
| 32 | + 1 -> pseudorange errs |
| 33 | + ''' |
| 34 | + if x0 is None: |
| 35 | + x0 = [0, 0, 0, 0, 0] |
| 36 | + |
| 37 | + if len(measurements) < min_measurements: |
| 38 | + return [],[] |
| 39 | + |
| 40 | + Fx_pos = pr_residual(measurements, posfix_functions, signal=signal, no_weight=no_weight, no_nans=True) |
| 41 | + x = gauss_newton(Fx_pos, x0) |
| 42 | + residual, _ = Fx_pos(x, no_weight=True) |
| 43 | + return x.tolist(), residual.tolist() |
| 44 | + |
| 45 | + |
| 46 | +def calc_vel_fix(measurements, est_pos, velfix_function=None, v0=None, no_weight=False, signal='D1C', min_measurements=6): |
| 47 | + ''' |
| 48 | + Calculates gps velocity fix using gauss newton method |
| 49 | + returns: |
| 50 | + 0 -> list with velocities |
| 51 | + 1 -> pseudorange_rate errs |
| 52 | + ''' |
| 53 | + if v0 is None: |
| 54 | + v0 = [0, 0, 0, 0] |
| 55 | + |
| 56 | + if len(measurements) < min_measurements: |
| 57 | + return [], [] |
| 58 | + |
| 59 | + Fx_vel = prr_residual(measurements, est_pos, velfix_function, signal=signal, no_weight=no_weight, no_nans=True) |
| 60 | + v = gauss_newton(Fx_vel, v0) |
| 61 | + residual, _ = Fx_vel(v, no_weight=True) |
| 62 | + return v.tolist(), residual.tolist() |
| 63 | + |
| 64 | + |
| 65 | +def get_posfix_sympy_fun(constellation): |
| 66 | + # Unknowns |
| 67 | + x, y, z = sympy.Symbol('x'), sympy.Symbol('y'), sympy.Symbol('z') |
| 68 | + bc = sympy.Symbol('bc') |
| 69 | + bg = sympy.Symbol('bg') |
| 70 | + zero_theta = sympy.Symbol('zero_theta') |
| 71 | + var = [x, y, z, bc, bg] |
| 72 | + |
| 73 | + # Knowns |
| 74 | + pr = sympy.Symbol('pr') |
| 75 | + sat_x, sat_y, sat_z = sympy.Symbol('sat_x'), sympy.Symbol('sat_y'), sympy.Symbol('sat_z') |
| 76 | + weight = sympy.Symbol('weight') |
| 77 | + |
| 78 | + theta = (EARTH_ROTATION_RATE * (pr - bc) / SPEED_OF_LIGHT)*zero_theta |
| 79 | + val = sympy.sqrt( |
| 80 | + (sat_x * sympy.cos(theta) + sat_y * sympy.sin(theta) - x) ** 2 + |
| 81 | + (sat_y * sympy.cos(theta) - sat_x * sympy.sin(theta) - y) ** 2 + |
| 82 | + (sat_z - z) ** 2 |
| 83 | + ) |
| 84 | + |
| 85 | + if constellation == ConstellationId.GLONASS: |
| 86 | + res = weight * (val - (pr - bc - bg)) |
| 87 | + elif constellation == ConstellationId.GPS: |
| 88 | + res = weight * (val - (pr - bc)) |
| 89 | + else: |
| 90 | + raise NotImplementedError(f"Constellation {constellation} not supported") |
| 91 | + |
| 92 | + res = [res] + [sympy.diff(res, v) for v in var] |
| 93 | + |
| 94 | + return sympy.lambdify([x, y, z, bc, bg, pr, zero_theta, sat_x, sat_y, sat_z, weight], res, modules=["numpy"]) |
| 95 | + |
| 96 | + |
| 97 | +def get_velfix_sympy_func(): |
| 98 | + # implementing this without sympy.Matrix gives a 2x speedup at generation |
| 99 | + |
| 100 | + # knowns, receiver position, satellite position, satellite velocity |
| 101 | + ep_x, ep_y, ep_z = sympy.Symbol('ep_x'), sympy.Symbol('ep_y'), sympy.Symbol('ep_z') |
| 102 | + est_pos = np.array([ep_x, ep_y, ep_z]) |
| 103 | + sp_x, sp_y, sp_z = sympy.Symbol('sp_x'), sympy.Symbol('sp_y'), sympy.Symbol('sp_z') |
| 104 | + sat_pos = np.array([sp_x, sp_y, sp_z]) |
| 105 | + sv_x, sv_y, sv_z = sympy.Symbol('sv_x'), sympy.Symbol('sv_y'), sympy.Symbol('sv_z') |
| 106 | + sat_vel = np.array([sv_x, sv_y, sv_z]) |
| 107 | + observables = sympy.Symbol('observables') |
| 108 | + weight = sympy.Symbol('weight') |
| 109 | + |
| 110 | + # unknown, receiver velocity |
| 111 | + v_x, v_y, v_z = sympy.Symbol('v_x'), sympy.Symbol('v_y'), sympy.Symbol('v_z') |
| 112 | + vel = np.array([v_x, v_y, v_z]) |
| 113 | + vel_o = sympy.Symbol('vel_o') |
| 114 | + |
| 115 | + loss = sat_pos - est_pos |
| 116 | + loss /= sympy.sqrt(loss.dot(loss)) |
| 117 | + |
| 118 | + nv = loss.dot(sat_vel - vel) |
| 119 | + ov = (observables - vel_o) |
| 120 | + res = (nv - ov)*weight |
| 121 | + |
| 122 | + res = [res] + [sympy.diff(res, v) for v in [v_x, v_y, v_z, vel_o]] |
| 123 | + |
| 124 | + return sympy.lambdify([ |
| 125 | + ep_x, ep_y, ep_z, sp_x, sp_y, sp_z, |
| 126 | + sv_x, sv_y, sv_z, observables, weight, |
| 127 | + v_x, v_y, v_z, vel_o |
| 128 | + ], |
| 129 | + res, modules=["numpy"]) |
| 130 | + |
| 131 | + |
| 132 | +def pr_residual(measurements: List[GNSSMeasurement], posfix_functions=None, signal='C1C', no_weight=False, no_nans=False): |
| 133 | + |
| 134 | + if posfix_functions is None: |
| 135 | + posfix_functions = {constellation: get_posfix_sympy_fun(constellation) for constellation in (ConstellationId.GPS, ConstellationId.GLONASS)} |
| 136 | + |
| 137 | + def Fx_pos(inp, no_weight=no_weight): |
| 138 | + vals, gradients = [], [] |
| 139 | + |
| 140 | + for meas in measurements: |
| 141 | + if signal in meas.observables_final and np.isfinite(meas.observables_final[signal]): |
| 142 | + pr = meas.observables_final[signal] |
| 143 | + sat_pos = meas.sat_pos_final |
| 144 | + zero_theta = 0 |
| 145 | + elif signal in meas.observables and np.isfinite(meas.observables[signal]) and meas.processed: |
| 146 | + pr = meas.observables[signal] |
| 147 | + pr += meas.sat_clock_err * SPEED_OF_LIGHT |
| 148 | + sat_pos = meas.sat_pos |
| 149 | + zero_theta = 1 |
| 150 | + else: |
| 151 | + if not no_nans: |
| 152 | + vals.append(np.nan) |
| 153 | + gradients.append(np.nan) |
| 154 | + continue |
| 155 | + |
| 156 | + w = 1.0 if no_weight or meas.observables_std[signal] == 0 else (1 / meas.observables_std[signal]) |
| 157 | + val, *gradient = posfix_functions[meas.constellation_id](*inp, pr, zero_theta, *sat_pos, w) |
| 158 | + vals.append(val) |
| 159 | + gradients.append(gradient) |
| 160 | + return np.asarray(vals), np.asarray(gradients) |
| 161 | + return Fx_pos |
| 162 | + |
| 163 | + |
| 164 | +def prr_residual(measurements: List[GNSSMeasurement], est_pos, velfix_function=None, signal='D1C', no_weight=False, no_nans=False): |
| 165 | + |
| 166 | + if velfix_function is None: |
| 167 | + velfix_function = get_velfix_sympy_func() |
| 168 | + |
| 169 | + def Fx_vel(vel, no_weight=no_weight): |
| 170 | + vals, gradients = [], [] |
| 171 | + |
| 172 | + for meas in measurements: |
| 173 | + if signal not in meas.observables or not np.isfinite(meas.observables[signal]): |
| 174 | + if not no_nans: |
| 175 | + vals.append(np.nan) |
| 176 | + gradients.append(np.nan) |
| 177 | + continue |
| 178 | + |
| 179 | + sat_pos = meas.sat_pos_final if meas.corrected else meas.sat_pos |
| 180 | + weight = 1.0 if no_weight or meas.observables_std[signal] == 0 else (1 / meas.observables_std[signal]) |
| 181 | + |
| 182 | + val, *gradient = velfix_function(est_pos[0], est_pos[1], est_pos[2], |
| 183 | + sat_pos[0], sat_pos[1], sat_pos[2], |
| 184 | + meas.sat_vel[0], meas.sat_vel[1], meas.sat_vel[2], |
| 185 | + meas.observables[signal], weight, |
| 186 | + vel[0], vel[1], vel[2], vel[3]) |
| 187 | + vals.append(val) |
| 188 | + gradients.append(gradient) |
| 189 | + |
| 190 | + return np.asarray(vals), np.asarray(gradients) |
| 191 | + return Fx_vel |
0 commit comments