This repository has been archived by the owner on Aug 29, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
calc_emit.py
158 lines (123 loc) · 4.91 KB
/
calc_emit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# encoding: utf-8
"""
Berechnung der RMS Emittanzen anhand der FWHM Breiten aus der
Dreigitter-Prozedur.
Usage:
calc_emit.py <DATA_FOLDER> <MADX_MODEL_FILE> <MADX_SEQUENCE_NAME> <OUTPUT_FILE>
"""
from __future__ import unicode_literals
from __future__ import division
from __future__ import print_function
import os
import sys
from math import sqrt, log
import numpy as np
# need cpymad installed:
from cpymad.madx import Madx
# imported from this folder:
from emit_math import calc_emit
def makedirs(path):
"""py2 compatibility function for ``os.makedirs(path, exist_ok=True)``."""
try:
os.makedirs(path)
except OSError: # no exist_ok on py2
pass
def init_madx(files):
"""Start MAD-X instance and initialize with the given files."""
madx = Madx(stdout=False)
for f in files:
madx.call(f, chdir=True)
return madx
def get_sectormaps(twiss, elems, files):
"""
Start MAD-X instance and compute sectormaps between the elements.
:param dict twiss: arguments for TWISS, must include ``sequence``
:param list elems: monitor names, must be sorted according to
occurence in the sequence!
:param list files: initialization files for MAD-X
:returns: the sectormaps between the individual monitors
"""
return init_madx(files).sectormap(elems, **twiss)
def parse_device_export(filename):
"""
Parse beam position and FWHM from pseudo .CSV file generated by the
"Laufender export" functionality in the control system.
"""
data = {}
with open(filename, 'rb') as f:
for line in f:
parts = line.decode('latin1').split(';')
if len(parts) == 2:
data[parts[0].strip()] = parts[1].strip()
# we only need the summary from the <HEADER/> and <CUSTOM/>
# blocks - and can ignore the raw measurements that follow:
if line == b'</CUSTOM>\n':
break
mefi = data['Mefi'].split()
assert mefi[0][0] == 'E'
assert mefi[1][0] == 'F'
assert mefi[2][0] == 'I'
assert mefi[3][0] == 'G'
fwhm_to_rms = 2*sqrt(2*log(2))
return {
'device': data['Gerät'].lower(),
'mefi': (int(data['VAcc ID']),
int(mefi[0][1:]),
int(mefi[1][1:]),
int(mefi[2][1:]),
int(mefi[3][1:])),
'tint': float(data['Integrationszeit [s]']),
'posx': float(data['Schwerpunkt X']),
'posy': float(data['Schwerpunkt Y']),
'envx': float(data['FWHM X']) / 1000 / fwhm_to_rms,
'envy': float(data['FWHM Y']) / 1000 / fwhm_to_rms,
}
def main(data_folder, madx_file, seq_name, output_file='results.txt'):
# read all valid measurements:
all_records = {}
for dirpath, dirnames, filenames in os.walk(data_folder):
for filename in filenames:
data = parse_device_export(os.path.join(dirpath, filename))
if data['envx'] != -9999.0 and data['envy'] != -9999.0:
all_records\
.setdefault(data['mefi'], {})\
.setdefault(data['device'], [])\
.append(data)
# average measurements for same mefi setting and device
averaged = {
mefi: {
device: {key: np.mean([item[key] for item in items])
for key in ('posx', 'posy', 'envx', 'envy')}
for device, items in devices.items()
} for mefi, devices in all_records.items()
}
# get a sorted list of monitors
sequence = init_madx([madx_file]).sequences[seq_name]
elements = set(dev for devs in averaged.values() for dev in devs)
elements = sorted(elements, key=sequence.elements.index)
del sequence
# TODO: initialize beam with correct particle + energy (?)
# NOTE: initial coordinates X=0:
twiss = dict(sequence=seq_name, betx=1, bety=1)
f = open(output_file, 'wt', 1)
print("# vacc energy focus intensity gantry ex ey pt alfx alfy betx bety", file=f)
for mefi, devices in averaged.items():
basename = 'M{}-E{}-F{}-I{}-G{}'.format(*mefi)
strengths = os.path.join('params', basename + '.str')
sectormaps = get_sectormaps(twiss, elements, [madx_file, strengths])
measurements = [devices[el] for el in elements]
results = calc_emit(measurements, sectormaps,
calc_long=True, calc_4D=False)
twiss_init = (results['ex'], results['ey'], results['pt'],
results['alfx'], results['alfy'],
results['betx'], results['bety'])
M, E, F, I, G = mefi
chn = format_channel
print(chn(M, 2), chn(E, 3), chn(F, 2), chn(I, 2), chn(G, 3),
*map(format_float, twiss_init), file=f)
def format_channel(num, width):
return '{:>3}'.format(num)
def format_float(num):
return '{:>12}'.format('{:+.5e}'.format(num))
if __name__ == '__main__':
sys.exit(main(*sys.argv[1:]))