-
Notifications
You must be signed in to change notification settings - Fork 0
/
NanoSight_Automation.py
2123 lines (1438 loc) · 85.6 KB
/
NanoSight_Automation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import wx
import threading
from pywinauto.application import Application
import pywinauto
import time
import re
import serial
import serial.tools.list_ports
import pandas
import os
import uiautomation as automation
import errno
import wx.lib.agw.genericmessagedialog as GMD
__version__ = "1.0.0"
EVT_PULLED_PLUG = wx.NewId()
EVT_THREAD_ABORTED = wx.NewId()
## Make new events so the thread can signal the main GUI.
class PulledPlugEvent(wx.PyEvent):
def __init__(self):
wx.PyEvent.__init__(self)
self.SetEventType(EVT_PULLED_PLUG)
class ThreadAbortedEvent(wx.PyEvent):
def __init__(self):
wx.PyEvent.__init__(self)
self.SetEventType(EVT_THREAD_ABORTED)
## Class taken from https://wiki.wxpython.org/LongRunningTasks and modified.
# Thread class that executes processing
class BatchThread(threading.Thread):
"""Batch Thread Class."""
def __init__(self, batch_data):
"""Init Serial Thread Class."""
threading.Thread.__init__(self)
self.batch_data = batch_data
self.sample_df = batch_data.sample_df
self.want_abort = False
self.daemon = True
# This starts the thread running on creation, but you could
# also make the GUI thread responsible for calling this
self.start()
def run(self):
"""Run Batch Thread."""
## Create save directories.
if not self.Create_Save_Directories():
wx.PostEvent(self.batch_data, ThreadAbortedEvent())
return
## Connect to Arduino, NTA, and CETAC.
Arduino_connect_result = self.Connect_To_Arduino()
if not Arduino_connect_result or not self.Connect_To_NTA() or not self.Connect_To_CETAC():
if Arduino_connect_result:
self.ComPort.close()
wx.PostEvent(self.batch_data, ThreadAbortedEvent())
return
## Reset output latch in Arduino.
## This reset of the latch is to make sure the latch starts from a known state before the batch begins.
while True:
send_response = self.Reset_AS_Output_Latch()
if send_response == "Latch Cleared":
break
elif send_response == "Abort":
self.Abort_Clean_Up()
return
elif send_response == "Pulled Plug":
self.PP_Clean_Up()
return
elif send_response == "Time Out":
message = "Time out reached while trying to communicate with the Arduino. \nCheck that the Arduino is functioning properly. \nClick Yes to try sending the signal again."
msg_dlg = wx.MessageDialog(None, message, "Warning", wx.YES_NO | wx.ICON_QUESTION)
answer = msg_dlg.ShowModal()
msg_dlg.Destroy()
if not answer == wx.ID_YES:
self.Abort_Clean_Up()
return
## Make sure CETAC is connected to the autosampler.
if not self.CETAC_Check_For_COM():
self.ComPort.close()
wx.PostEvent(self.batch_data, ThreadAbortedEvent())
return
## Make sure CETAC has a script loaded.
if not self.CETAC_Check_For_Active_Script():
self.ComPort.close()
wx.PostEvent(self.batch_data, ThreadAbortedEvent())
return
######################
## Acquire the samples.
######################
for i in range(len(self.sample_df)):
## Before starting make sure the user hasn't already wanted to abort.
if self.want_abort:
self.Abort_Clean_Up()
return
## Set the label in acquistion progress to in progress and change color to blue.
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "In Progress")
self.batch_data.sample_list_ctrl.SetItemBackgroundColour(i, "light blue")
## Set base filename for sample.
if not self.NTA_Set_Filename(self.sample_df.loc[:, "Save Directory"][i], self.sample_df.loc[:, "Sample Name"][i]):
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
## Set up correct script in autosampler and NanoSight.
if not self.NTA_Load_Script(self.sample_df.loc[:, "Acquire Script"][i]):
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
## Check for abort again.
if self.want_abort:
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
## Flush the input buffer before starting the autosampler script to make sure
## signals from a previous run or false signals aren't misinterpreted.
self.ComPort.reset_input_buffer()
self.ComPort.flushInput()
## If this is the first sample start the script, otherwise send trigger to MVX if it is ready.
if i == 0:
## Close any windows that may be left open by the user.
self.CETAC_Close_All_Windows()
## Start autosampler script.
run_response = self.CETAC_Run_Script()
if run_response == "Run Script Button Disabled" or run_response == "Abort Script Button Disabled" or run_response == "No Samples Selected":
self.ComPort.close()
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
wx.PostEvent(self.batch_data, ThreadAbortedEvent())
return
## Check for errors after starting the script.
if self.CETAC_Check_For_Errors():
self.ComPort.close()
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
wx.PostEvent(self.batch_data, ThreadAbortedEvent())
return
else:
## Check for errors.
if self.CETAC_Check_For_Errors():
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
## Wait for signal from autosampler.
timeout = 15
listen_response = self.Check_AS_Output_Latch(timeout)
## Check to see if the batch was aborted while listening or the Arduino was disconnected.
if listen_response == "Abort" or listen_response == "Time Out":
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
elif listen_response == "Pulled Plug":
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.PP_Clean_Up()
return
## Wait some time before sending signal to autosampler to make sure
## it has had time to move on to the trigger command, and so we do
## not relatch the same output signal twice.
time.sleep(6)
## Reset output latch in Arduino.
## If something goes wrong with the Arduino at this point then it can
## be fixed or replaced and the batch can continue so allow the user to retry sending the signal to the Arduino.
while True:
send_response = self.Reset_AS_Output_Latch()
if send_response == "Latch Cleared":
break
elif send_response == "Abort":
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
elif send_response == "Pulled Plug":
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.PP_Clean_Up()
return
elif send_response == "Time Out":
message = "Time out reached while trying to communicate with the Arduino. \nCheck that the Arduino is functioning properly. \nClick Yes to try sending the signal again."
msg_dlg = wx.MessageDialog(None, message, "Warning", wx.YES_NO | wx.ICON_QUESTION)
answer = msg_dlg.ShowModal()
msg_dlg.Destroy()
if not answer == wx.ID_YES:
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
## Send signal to autosampler.
## If something goes wrong with the Arduino at this point then it can
## be fixed or replaced and the batch can continue so allow the user to retry sending the signal to the AS.
while True:
send_response = self.Send_Signal_To_AS()
if send_response == "Signal Sent":
break
elif send_response == "Abort":
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
elif send_response == "Pulled Plug":
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.PP_Clean_Up()
return
elif send_response == "Time Out":
message = "Time out reached while trying to signal the autosampler. \nCheck that the Arduino is functioning properly. \nClick Yes to try sending the signal again."
msg_dlg = wx.MessageDialog(None, message, "Warning", wx.YES_NO | wx.ICON_QUESTION)
answer = msg_dlg.ShowModal()
msg_dlg.Destroy()
if not answer == wx.ID_YES:
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
## Check that script is still running in CETAC.
## If it is not then the most likely cause is that less samples
## were selected in Select Sample Set than were in the sample list file.
time.sleep(5)
if not self.CETAC_Is_Script_Running():
message = "The CETAC script is no longer running. The most likely cause is that there were more samples in the Sample List File than were selected in Select Sample Set."
msg_dlg = wx.MessageDialog(None, message, "Error", wx.OK | wx.ICON_ERROR)
answer = msg_dlg.ShowModal()
msg_dlg.Destroy()
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
## Wait for signal from autosampler.
timeout = 15
listen_response = self.Check_AS_Output_Latch(timeout)
## Check to see if the batch was aborted while listening or the Arduino was disconnected.
if listen_response == "Abort" or listen_response == "Time Out":
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
elif listen_response == "Pulled Plug":
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.PP_Clean_Up()
return
## Start NanoSight script.
if not self.NTA_Run_Script():
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.PP_Clean_Up()
return
## Reset output latch in Arduino.
## If something goes wrong with the Arduino at this point then it can
## be fixed or replaced and the batch can continue so allow the user to retry sending the signal to the Arduino.
## Wait for 5 seconds before resetting latch to make sure the autosampler has turned off its output.
time.sleep(5)
while True:
send_response = self.Reset_AS_Output_Latch()
if send_response == "Latch Cleared":
break
elif send_response == "Abort":
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
elif send_response == "Pulled Plug":
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.PP_Clean_Up()
return
elif send_response == "Time Out":
message = "Time out reached while trying to communicate with the Arduino. \nCheck that the Arduino is functioning properly. \nClick Yes to try sending the signal again."
msg_dlg = wx.MessageDialog(None, message, "Warning", wx.YES_NO | wx.ICON_QUESTION)
answer = msg_dlg.ShowModal()
msg_dlg.Destroy()
if not answer == wx.ID_YES:
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
## Wait for end of script signal from NanoSight.
listen_response = self.Listen_For_End_Of_Script_NTA_Signal()
## Check to see if the batch was aborted while listening.
if listen_response == "Aborted":
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
## Set the label in acquistion progress to Complete and change color back to normal.
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Complete")
self.batch_data.sample_list_ctrl.SetItemBackgroundColour(i, "white")
## Give time for NTA to finish completing the script.
time.sleep(5)
######################
## Process the samples.
######################
for i in range(len(self.sample_df)):
if self.want_abort:
wx.PostEvent(self.batch_data, ThreadAbortedEvent())
return
## Set the label in processing progress to in progress and change color to blue.
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Processing Progress"), "In Progress")
self.batch_data.sample_list_ctrl.SetItemBackgroundColour(i, "light blue")
## Once when running 10 standards in a row after the 4th standard was processed the NTA_Check_Existence
## returned False. I am pretty sure it was the check existence call in NTA_Load_Script because the
## program was still in the analysis tab. I think that maybe after exporting there needs to be some
## time before checking existence, so adding the below timer. It should be noted that after the
## existence check failed the NTA program was pretty much locked up, so it might have just been a fluke.
time.sleep(10)
## Turns out it might be that the NTA program can't process more than 4 samples in a row.
## When I ran 4 processings in a row manually the same bug happened.
## Load Process Script
if not self.NTA_Load_Script(self.sample_df.loc[:, "Process Script"][i]):
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Processing Progress"), "Cancelled")
self.Abort_Clean_Up()
return
## Open experiment to process.
if not self.NTA_Open_Experiment(self.sample_df.loc[:, "Save Directory"][i], self.sample_df.loc[:, "Sample Name"][i]):
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Processing Progress"), "Cancelled")
self.Abort_Clean_Up()
return
if self.want_abort:
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Processing Progress"), "Cancelled")
self.Abort_Clean_Up()
return
## Start NanoSight script.
if not self.NTA_Run_Script():
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Processing Progress"), "Cancelled")
self.Abort_Clean_Up()
return
## The assumption is that the process script will use the PROCESSBASIC command
## Wait for end of script signal from NanoSight.
listen_response = self.Listen_For_End_Of_Script_NTA_Signal()
## Check to see if the batch was aborted while listening.
if listen_response == "Aborted":
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Processing Progress"), "Cancelled")
self.Abort_Clean_Up()
return
## Export results.
if not self.NTA_Export_Results():
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Processing Progress"), "Cancelled")
self.Abort_Clean_Up()
return
## Set the label in acquistion progress to Complete and change color back to normal.
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Processing Progress"), "Complete")
self.batch_data.sample_list_ctrl.SetItemBackgroundColour(i, "white")
## Give the autosampler the last trigger command so it can end its program.
## Wait for signal from autosampler.
timeout = 15
listen_response = self.Check_AS_Output_Latch(timeout)
## Check to see if the batch was aborted while listening or the Arduino was disconnected.
if listen_response == "Abort" or listen_response == "Time Out":
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
elif listen_response == "Pulled Plug":
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.PP_Clean_Up()
return
## Wait some time before sending signal to autosampler to make sure
## it has had time to move on to the trigger command.
time.sleep(6)
## Send signal to autosampler.
## If something goes wrong with the Arduino at this point then it can
## be fixed or replaced and the batch can continue so allow the user to retry sending the signal to the AS.
while True:
send_response = self.Send_Signal_To_AS()
if send_response == "Signal Sent":
break
elif send_response == "Abort":
self.Abort_Clean_Up()
return
elif send_response == "Pulled Plug":
self.PP_Clean_Up()
return
elif send_response == "Time Out":
message = "Time out reached while trying to signal the autosampler. \nCheck that the Arduino is functioning properly. \nClick Yes to try sending the signal again."
msg_dlg = wx.MessageDialog(None, message, "Warning", wx.YES_NO | wx.ICON_QUESTION)
answer = msg_dlg.ShowModal()
msg_dlg.Destroy()
if not answer == wx.ID_YES:
self.Abort_Clean_Up()
return
## Check that script is still running in CETAC.
## If it is then the most likely cause is that more samples
## were selected in Select Sample Set than were in the sample list file.
time.sleep(5)
if self.CETAC_Is_Script_Running():
self.CETAC_Abort_Script()
message = "The CETAC script was still running after completing all samples in the Sample List File. The most likely cause is that there were less samples in the Sample List File than were selected in Select Sample Set."
msg_dlg = wx.MessageDialog(None, message, "Error", wx.OK | wx.ICON_ERROR)
answer = msg_dlg.ShowModal()
msg_dlg.Destroy()
self.batch_data.sample_list_ctrl.SetItem(i, self.batch_data.list_ctrl_col_names.index("Acquisition Progress"), "Cancelled")
self.Abort_Clean_Up()
return
self.ComPort.close()
message = "Batch Complete!"
msg_dlg = wx.MessageDialog(None, message, "Batch Complete", wx.OK)
answer = msg_dlg.ShowModal()
msg_dlg.Destroy()
wx.PostEvent(self.batch_data, ThreadAbortedEvent())
def abort(self):
"""Abort Batch Thread."""
# Method for use by main thread to signal an abort.
self.want_abort = True
def Abort_Clean_Up(self):
"""Calls the functions to abort the NTA and CETAC scripts, closes the
serial connection to the Arduino, and posts the abort event to the main thread."""
self.NTA_Abort_Script()
self.CETAC_Abort_Script()
self.ComPort.close()
wx.PostEvent(self.batch_data, ThreadAbortedEvent())
def PP_Clean_Up(self):
"""Calls the functions to abort the NTA and CETAC scripts, closes the
serial connection to the Arduino, and posts the pulled plug event to the main thread."""
self.NTA_Abort_Script()
self.CETAC_Abort_Script()
self.ComPort.close()
wx.PostEvent(self.batch_data, PulledPlugEvent())
def Create_Save_Directories(self):
"""Create the directories to save the acquisitons and analyses in."""
sample_df = self.sample_df
problem_samples = []
for i in range(len(sample_df)):
directory = sample_df.loc[i, "Save Directory"]
sample_name = sample_df.loc[i, "Sample Name"]
if self.batch_data.samples_have_individual_directories:
directory = os.path.join(directory, sample_name)
try:
os.makedirs(directory)
except OSError as e:
if e.errno != errno.EEXIST:
problem_samples.append(sample_name)
if len(problem_samples) > 0:
message = "An error occurred while creating the directories for the following sample(s): \n\n" + "\n".join(problem_samples)
msg_dlg = wx.MessageDialog(None, message, "Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
else:
return True
def Connect_To_Arduino(self):
"""Finds an Arduino in the list of serial ports and connects to it. If there are
multiple Arduino's a message is created and no connection is made. If there are no
Arduino's a message is created and no connection is made. The serial settings are:
baudrate = 115200
bytesize = 8
parity = serial.PARITY_NONE
stopbits = 1
timeout = 1"""
ports = [tuple(p) for p in list(serial.tools.list_ports_windows.comports())]
ports = {name:description for name, description, value2 in ports}
arduino_port = [name for name, description in ports.items() if re.search(r"Arduino", description)]
if len(arduino_port) == 0:
message = "Could not find Arduino. Please connect the Arduino to run the batch."
msg_dlg = wx.MessageDialog(None, message, "Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
elif len(arduino_port) > 1:
message = "There are too many Arduino's connected. The correct one can not be determined."
msg_dlg = wx.MessageDialog(None, message, "Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
else:
try:
ComPort = serial.Serial(arduino_port[0])
ComPort.baudrate = 115200
ComPort.bytesize = 8
ComPort.parity = serial.PARITY_NONE
ComPort.stopbits = 1
ComPort.timeout = 1
self.ComPort = ComPort
return True
except serial.serialutil.SerialException:
message = "Could not establish connection to Arduino. Check it and try again."
msg_dlg = wx.MessageDialog(None, message, "Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
def Connect_To_NTA(self):
"""Looks for a program with \"NTA 3.3\" in the title and connects to it using
the pywinauto library. If no program is found a message is created."""
windows = pywinauto.findwindows.find_elements()
if not any([re.search(r"NTA 3.3", str(window)) for window in windows]):
message = "The NTA 3.3 program is not started. Please start the program and try again."
msg_dlg = wx.MessageDialog(None, message, "Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
else:
self.NTA_app = Application().connect(title_re = u".*NTA 3.3.*")
return True
def Connect_To_CETAC(self):
"""Looks for a program with \"CETAC Workstation\" in the title and connects to it using
the automation library. If no program is found a message is created."""
windows = pywinauto.findwindows.find_elements()
if not any([re.search(r"CETAC Workstation", str(window)) for window in windows]):
message = "The CETAC Workstation program is not started. Please start the program and try again."
msg_dlg = wx.MessageDialog(None, message, "Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
else:
self.CETAC_app = automation.WindowControl(Name = "CETAC Workstation")
return True
def NTA_Check_Existence(self):
"""Checks to make sure the NTA 3.3 program is still open. Returns True if it is, False if not."""
windows = [str(window) for window in pywinauto.findwindows.find_elements()]
if not any([re.search(r"NTA 3.3", str(window)) for window in windows]):
message = "The NTA 3.3 program is not open."
msg_dlg = wx.MessageDialog(None, message, "Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
else:
return True
def NTA_Set_Filename(self, save_directory, sample_name):
"""Sets the base filename in the NTA 3.3 program according to save_directory and sample_name.
If the samples have individual directories the path is save_directory\\sample_name\\sample_name.
Otherwise it is save_directory\\sample_name."""
if not self.NTA_Check_Existence():
return False
if self.batch_data.samples_have_individual_directories:
file_path = os.path.join(save_directory, sample_name, sample_name)
else:
file_path = os.path.join(save_directory, sample_name)
nta = self.NTA_app["NTA 3.3"]
## Wait for the window to be ready.
time.sleep(2)
## Select the tab with the load script button.
nta.TabControl2.Select(u'SOP')
time.sleep(2)
## Check that tab was selected.
if nta.TabContol2.get_selected_tab() != 0:
message = "Could not select the SOP tab in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Set File Name Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
## Select "Recent Measurements" from the combo box.
nta.TabControl3.Select(u"Recent Measurements")
time.sleep(2)
## Check that the combo box option was selected.
if nta.TableControl3.get_selected_tab() != 0:
message = "Could not select the Recent Measurements tab in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Set File Name Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
## Click the load script button.
nta["..."].click()
time.sleep(2)
## Make sure the click was successful and a save dialog was created.
window_check_result = self.NTA_Window_Check("\'Save As\'", True, 0.2)
if window_check_result == "Abort":
return False
elif window_check_result == "Timed Out":
message = "Could not click the \"...\" button to select a base filename in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Set File Name Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
## File dialog interaction.
window = self.NTA_app["Save As"]
time.sleep(2)
window["Edit"].set_text(file_path)
time.sleep(2)
## Check that the text was edited.
if window["Edit"].texts()[0] != file_path:
message = "Could not enter the file path into the Save As dialog in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Set File Name Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
window = automation.WindowControl(Name = "Save As")
window.SetActive(waitTime=1)
window.ButtonControl(Name = "Save").Click()
time.sleep(3)
## Make sure the window closed after clicking open.
## If it did not then something went wrong.
window_check_result = self.NTA_Window_Check("\'Save As\'", False, 0.2)
if window_check_result == "Abort":
return False
elif window_check_result == "Timed Out":
message = "Could not click the Save button in the Save As dialog in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Set File Name Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
return True
def NTA_Load_Script(self, script_filepath):
"""Loads the script indicated by script_filepath in the NTA 3.3 program."""
if not self.NTA_Check_Existence():
return False
nta = self.NTA_app["NTA 3.3"]
## Wait for the window to be ready.
time.sleep(2)
## Select the tab with the load script button.
nta.TabControl2.Select(u'SOP')
time.sleep(2)
## Check that tab was selected.
if nta.TabContol2.get_selected_tab() != 0:
message = "Could not select the SOP tab in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Load Script Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
## Select "Recent Measurements" from the combo box.
nta.TabControl3.Select(u"Recent Measurements")
time.sleep(2)
## Check that the combo box option was selected.
if nta.TableControl3.get_selected_tab() != 0:
message = "Could not select the Recent Measurements tab in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Set File Name Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
## Click the load script button.
nta["Load Script"].click()
time.sleep(2)
## Make sure the click was successful and an Open dialog was created.
window_check_result = self.NTA_Window_Check("\'Open\'", True, 0.2)
if window_check_result == "Abort":
return False
elif window_check_result == "Timed Out":
message = "Could not load a script in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Load Script Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
## File dialog interaction.
window = self.NTA_app["Open"]
time.sleep(2)
window["Edit"].set_text(script_filepath)
time.sleep(2)
## Check that the text was edited.
if window["Edit"].texts()[0] != script_filepath:
message = "Could not enter the file path into the Open dialog in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Load Script Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
window = automation.WindowControl(Name = "Open")
window.SetActive(waitTime=1)
window.SplitButtonControl(Name = "Open").Click()
time.sleep(3)
## Make sure the window closed after clicking open.
## If it did not then something went wrong.
window_check_result = self.NTA_Window_Check("\'Open\'", False, 0.2)
if window_check_result == "Abort":
return False
elif window_check_result == "Timed Out":
message = "Could not load a script in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Load Script Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
return True
def NTA_Run_Script(self):
"""Clicks the Run button to run a script in the NTA 3.3 program. This is
the button located in the SOP tab under Recent Measurements, not to be
confused with the button on the script panel. If a Warning dialog appears
after clicking run it is assumed that it is the superfluous one asking about
overwriting files and clicks Yes."""
if not self.NTA_Check_Existence():
return False
nta = self.NTA_app["NTA 3.3"]
## Wait for the window to be ready.
time.sleep(2)
## Select the tab with the load script button.
nta.TabControl2.Select(u'SOP')
time.sleep(2)
## Check that tab was selected.
if nta.TabContol2.get_selected_tab() != 0:
message = "Could not select the SOP tab in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Run Script Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
## Select "Recent Measurements" from the combo box.
nta.TabControl3.Select(u"Recent Measurements")
time.sleep(2)
## Check that the combo box option was selected.
if nta.TableControl3.get_selected_tab() != 0:
message = "Could not select the Recent Measurements tab in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Set File Name Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
## Click the run button.
nta["Run"].click()
time.sleep(2)
## At time of writing there is no way to confirm that the script is running.
## Close any warning message that may appear such as from the camerasettingmessage command.
window_check_result = self.NTA_Window_Check("\'Warning\'", False, 0.2)
if window_check_result == "Abort":
return False
elif window_check_result == "Timed Out":
window = automation.WindowControl(Name = "Warning")
window.SetActive(waitTime=1)
window.ButtonControl(Name = "Yes").Click()
time.sleep(2)
window_check_result = self.NTA_Window_Check("\'Warning\'", False, 0.2)
if window_check_result == "Abort":
return False
elif window_check_result == "Timed Out":
message = "Could not close the warning dialog in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Run Script Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
return True
def NTA_Open_Experiment(self, save_directory, sample_name, timeout=2):
"""Opens the experiment given by save_directory and sample_name in the NTA 3.3 program.
If samples have individual directories the file path is save_directory\\sample_name\\sample_name.nano.
Otherwise it is save_directory\\sample_name.nano.
Also checks the video file names loaded in Current Experiment and waits until they
are similar to sample_name. Will check for up to timeout minutes."""
if not self.NTA_Check_Existence():
return False
nta = self.NTA_app["NTA 3.3"]
## Wait for the window to be ready.
time.sleep(2)
## Select the analysis tab.
nta.TabControl2.Select(u'Analysis')
time.sleep(2)
## Check that tab was selected.
if nta.TabContol2.get_selected_tab() != 2:
message = "Could not select the Analysis tab in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Open Experiment Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()
msg_dlg.Destroy()
return False
## Select current experiment tab.
nta.TabControl4.Select(u"Current Experiment")
time.sleep(2)
## Check that tab was selected.
if nta.TabContol4.get_selected_tab() != 1:
message = "Could not select the Current Experiment tab in NTA. Batch aborted."
msg_dlg = wx.MessageDialog(None, message, "NTA Open Experiment Error", wx.OK | wx.ICON_ERROR)
msg_dlg.ShowModal()