8
8
import logging
9
9
import os
10
10
from pathlib import Path
11
- from typing import (
12
- Any ,
13
- Dict ,
14
- Optional ,
15
- Set ,
16
- Union ,
17
- Tuple ,
18
- Iterable ,
19
- Iterator ,
20
- )
11
+ from typing import Any , Dict , Optional , Set , Union , Tuple , Iterable , Iterator , cast
21
12
from opentrons_hardware .firmware_bindings .constants import (
22
13
FirmwareTarget ,
23
14
NodeId ,
33
24
34
25
_DEFAULT_PCBA_REVS : Final [Dict [FirmwareTarget , str ]] = {
35
26
NodeId .head : "c2" ,
36
- NodeId .head_bootloader : "c2" ,
37
27
NodeId .gantry_x : "c1" ,
38
- NodeId .gantry_x_bootloader : "c1" ,
39
28
NodeId .gantry_y : "c1" ,
40
- NodeId .gantry_y_bootloader : "c1" ,
41
29
NodeId .gripper : "c1" ,
42
- NodeId .gripper_bootloader : "c1" ,
43
30
USBTarget .rear_panel : "b1" ,
44
31
}
45
32
@@ -64,6 +51,8 @@ class FirmwareUpdateType(Enum):
64
51
pipettes_96 = 6
65
52
rear_panel = 7
66
53
unknown = - 1
54
+ unknown_no_subtype = - 2
55
+ unknown_no_revision = - 3
67
56
68
57
def __str__ (self ) -> str :
69
58
"""Name of enum."""
@@ -101,7 +90,14 @@ def from_node(cls, node: NodeId) -> "FirmwareUpdateType":
101
90
NodeId .gantry_y : cls .gantry_y ,
102
91
NodeId .gripper : cls .gripper ,
103
92
}
104
- return lookup .get (node , cls .unknown )
93
+ return lookup [node .application_for ()]
94
+
95
+ @classmethod
96
+ def from_node_info (cls , node : NodeId , subid : int ) -> "FirmwareUpdateType" :
97
+ """Build an update type from a node and subid."""
98
+ if node .application_for () in (NodeId .pipette_left , NodeId .pipette_right ):
99
+ return cls .from_pipette (PipetteType (subid ))
100
+ return cls .from_node (node )
105
101
106
102
@classmethod
107
103
def from_pipette (cls , pipette : PipetteType ) -> "FirmwareUpdateType" :
@@ -194,36 +190,48 @@ def _revision_for_core_or_gripper(device_info: DeviceInfoCache) -> str:
194
190
because PCBAs of the default revision were built before revision handling was
195
191
introduced, and cannot be updated because too many were made.
196
192
"""
197
- return device_info .revision .main or _DEFAULT_PCBA_REVS [device_info .target ]
193
+ return (
194
+ device_info .revision .main
195
+ or _DEFAULT_PCBA_REVS [device_info .target .application_for ()]
196
+ )
198
197
199
198
200
199
def _revision_for_pipette (
201
- pipette_type : PipetteType , device_info : DeviceInfoCache
200
+ device_info : DeviceInfoCache , fallback_pipette_type : PipetteType
202
201
) -> str :
203
202
"""Returns the appropriate defaulted revision for a pipette.
204
203
205
204
The default revision can be determined solely from the pipette type. This is
206
205
needed because PCBAs of the default revision were built before revision handling
207
206
was introduced, and cannot be updated because too many were made.
208
207
"""
208
+ try :
209
+ pipette_type = PipetteType (device_info .subidentifier )
210
+ except ValueError :
211
+ pipette_type = fallback_pipette_type
209
212
return device_info .revision .main or _DEFAULT_PCBA_REVS_PIPETTE [pipette_type ]
210
213
211
214
215
+ def _revision (version_cache : DeviceInfoCache ) -> str :
216
+ if version_cache .target .application_for () in (
217
+ NodeId .pipette_left ,
218
+ NodeId .pipette_right ,
219
+ ):
220
+ return _revision_for_pipette (
221
+ version_cache , PipetteType (version_cache .subidentifier )
222
+ )
223
+ else :
224
+ return _revision_for_core_or_gripper (version_cache )
225
+
226
+
212
227
def _update_details_for_device (
213
- attached_pipettes : Dict [NodeId , PipetteType ],
214
228
version_cache : DeviceInfoCache ,
215
229
) -> Tuple [FirmwareUpdateType , str ]:
216
230
if version_cache .target in NodeId :
217
231
node = NodeId (version_cache .target )
218
- if node in attached_pipettes :
219
- pipette_type = attached_pipettes [node ]
220
- return FirmwareUpdateType .from_pipette (pipette_type ), _revision_for_pipette (
221
- pipette_type , version_cache
222
- )
223
- else :
224
- return FirmwareUpdateType .from_node (node ), _revision_for_core_or_gripper (
225
- version_cache
226
- )
232
+ return FirmwareUpdateType .from_node_info (
233
+ node , version_cache .subidentifier
234
+ ), _revision (version_cache )
227
235
else :
228
236
return FirmwareUpdateType .from_usb_target (
229
237
USBTarget (version_cache .target )
@@ -233,22 +241,27 @@ def _update_details_for_device(
233
241
def _update_type_for_device (
234
242
attached_pipettes : Dict [NodeId , PipetteType ],
235
243
version_cache : DeviceInfoCache ,
236
- ) -> Union [ Tuple [FirmwareUpdateType , str ], Tuple [ None , None ] ]:
244
+ ) -> Tuple [FirmwareUpdateType , str ]:
237
245
try :
238
- update_type , revision = _update_details_for_device (
239
- attached_pipettes , version_cache
240
- )
246
+ update_type , revision = _update_details_for_device (version_cache )
241
247
except KeyError :
242
- pipette_type = (
243
- attached_pipettes .get (NodeId (version_cache .target ), None )
244
- if version_cache .target in NodeId
245
- else None
248
+ log .error (
249
+ f"Node { version_cache .target .name } has no revision or default revision and cannot be updated"
246
250
)
251
+ return (FirmwareUpdateType .unknown_no_revision , "" )
252
+ except ValueError :
253
+ # This means we did not have a valid pipette type in the version cache, so fall back to the passed-in
254
+ # attached_pipettes data.
255
+ # TODO: Delete this fallback when all pipettes have subidentifiers in their bootloaders.
256
+ if version_cache .target in attached_pipettes :
257
+ pipette_type = attached_pipettes [cast (NodeId , version_cache .target )]
258
+ return FirmwareUpdateType .from_pipette (pipette_type ), _revision_for_pipette (
259
+ version_cache , pipette_type
260
+ )
247
261
log .error (
248
- f"Node { version_cache .target .name } (pipette { pipette_type } ) "
249
- "has no revision or default revision"
262
+ f"Target { version_cache .target .name } has no known subtype and cannot be updated"
250
263
)
251
- return None , None
264
+ return ( FirmwareUpdateType . unknown_no_subtype , "" )
252
265
return update_type , revision
253
266
254
267
@@ -259,15 +272,7 @@ def _update_types_from_devices(
259
272
for version_cache in devices :
260
273
log .debug (f"Checking firmware update for { version_cache .target .name } " )
261
274
# skip pipettes that dont have a PipetteType
262
- node_id = version_cache .target
263
- if node_id in [
264
- NodeId .pipette_left ,
265
- NodeId .pipette_right ,
266
- ] and not attached_pipettes .get (NodeId (node_id )):
267
- continue
268
275
update_type , rev = _update_type_for_device (attached_pipettes , version_cache )
269
- if not rev or not update_type :
270
- continue
271
276
yield (version_cache , update_type , rev )
272
277
273
278
@@ -284,6 +289,9 @@ def _should_update(
284
289
update_info : UpdateInfo ,
285
290
force : bool ,
286
291
) -> bool :
292
+ if version_cache .target .is_bootloader ():
293
+ log .info (f"Update required for { version_cache .target } (in bootloader)" )
294
+ return True
287
295
if force :
288
296
log .info (f"Update required for { version_cache .target } (forced)" )
289
297
return True
@@ -303,7 +311,7 @@ def _update_info_for_type(
303
311
update_type : Optional [FirmwareUpdateType ],
304
312
) -> Optional [UpdateInfo ]:
305
313
# Given the update_type find the corresponding updateInfo, monadically on update_info
306
- if not update_type :
314
+ if not update_type or update_type . value <= FirmwareUpdateType . unknown . value :
307
315
return None
308
316
try :
309
317
update_info = known_firmware [update_type ]
@@ -337,6 +345,8 @@ def _info_for_required_updates(
337
345
yield version_cache .target , update_info .version , update_info .files_by_revision , rev
338
346
339
347
348
+ # TODO: When we're confident that all pipettes report their type in the device info subidentifier
349
+ # field, both in application and bootloader, we can remove the attached_pipettes from this codepath.
340
350
def check_firmware_updates (
341
351
device_info : Dict [FirmwareTarget , DeviceInfoCache ],
342
352
attached_pipettes : Dict [NodeId , PipetteType ],
0 commit comments