Skip to content

Commit

Permalink
Split ascii file reading to its own function
Browse files Browse the repository at this point in the history
  • Loading branch information
tukiains committed Jun 16, 2023
1 parent 7340a72 commit 97b3791
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 179 deletions.
256 changes: 77 additions & 179 deletions mwrpy/level2/get_ret_coeff.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,177 +24,13 @@ def get_mvr_coeff(site: str, prefix: str, freq: np.ndarray):
c_list = get_coeff_list(site, prefix)
coeff: dict = {}

if (str(c_list[0][-3:]).lower() == "ret") & (len(c_list) == 1):
with open(c_list[0], "r", encoding="utf8") as f:
lines = f.readlines()
lines = [line.rstrip("\n") for line in lines]
line_count = len(lines)
line_num = -1
while line_num < line_count - 1:
line_num += 1
line = lines[line_num]
if ("=" in line) & (line[0] not in ("#", ":")):
if "#" in line:
line = line.split("#")[0]
name, tmp = line.split("=")
if name not in ("SL", "SQ"):
if not tmp.strip()[0].isalpha():
value = np.array(
[float(idx) for idx in tmp.split()], np.float32
)
if name == "NS":
name_list = [
"input_offset",
"input_scale",
"output_offset",
"output_scale",
]
for split_name in name_list:
if split_name in coeff:
coeff[split_name] = np.vstack(
(coeff[split_name], value)
)
else:
coeff[split_name] = value
if split_name != "output_scale":
line_num += 1
_, tmp = lines[line_num].split(":")
value = np.array(
[float(idx) for idx in tmp.split()], np.float32
)
line_num -= 1
elif name == "W1":
w1_stack = value
while lines[line_num + 1][0:2] != "W2":
line_num += 1
_, tmp = lines[line_num].split(":")
tmp_splitted = tmp.split()
value = np.array(
[float(tmp_splitted[idx]) for idx in range(len(value))],
np.float32,
)
w1_stack = np.vstack((w1_stack, value))
if name in coeff:
if coeff[name].ndim == 3:
coeff[name] = np.concatenate(
(coeff[name], w1_stack[:, :, np.newaxis]), axis=2
)
else:
coeff[name] = np.stack((coeff[name], w1_stack), axis=2)
else:
coeff[name] = w1_stack
elif name == "W2":
w2_stack = value
for _ in range(len(coeff["AL"]) - 1):
line_num += 1
_, tmp = lines[line_num].split(":")
tmp_splitted = tmp.split()
value = np.array(
[float(tmp_splitted[idx]) for idx in range(len(value))],
np.float32,
)
w2_stack = np.vstack((w2_stack, value))
if name in coeff:
if coeff[name].ndim == 3:
coeff[name] = np.concatenate(
(coeff[name], w2_stack[:, :, np.newaxis]), axis=2
)
elif (coeff[name].ndim == 2) & (w2_stack.ndim == 1):
coeff[name] = np.concatenate(
(coeff[name], w2_stack[np.newaxis, :]), axis=0
)
elif (coeff[name].ndim == 2) & (w2_stack.ndim == 2):
coeff[name] = np.stack((coeff[name], w2_stack), axis=2)
else:
coeff[name] = np.vstack((coeff[name], w2_stack))
else:
coeff[name] = w2_stack
elif name == "RM":
rm_stack = value
for _ in range(len(coeff["AL"]) - 1):
line_num += 1
_, tmp = lines[line_num].split(":")
tmp_splitted = tmp.split()
rm_stack = np.vstack((rm_stack, float(tmp_splitted[0])))
if name in coeff:
if (coeff[name].ndim > 1) & (rm_stack.ndim > 1):
coeff[name] = np.concatenate(
(coeff[name], rm_stack), axis=1
)
elif (coeff[name].ndim > 1) & (rm_stack.ndim == 1):
coeff[name] = np.concatenate(
(coeff[name], rm_stack[np.newaxis, :]), axis=0
)
else:
coeff[name] = np.vstack((coeff[name], rm_stack))
else:
coeff[name] = rm_stack
elif name == "OS":
if "AL" not in coeff:
coeff["AL"] = [0]
os_stack = value
for _ in range(len(coeff["AL"]) - 1):
line_num += 1
_, tmp = lines[line_num].split(":")
tmp_splitted = tmp.split()
os_stack = np.vstack((os_stack, float(tmp_splitted[0])))
if name in coeff:
coeff[name] = np.concatenate(
(coeff[name], os_stack), axis=1
)
else:
coeff[name] = os_stack
elif name == "TL":
tl_stack = value
for _ in range(len(coeff["AL"]) - 1):
line_num += 1
_, tmp = lines[line_num].split(":")
tmp_splitted = tmp.split()
if prefix == "tpb":
value = np.array(
[
float(tmp_splitted[idx])
for idx in range(len(coeff["AG"]))
],
np.float32,
)
else:
value = np.array(
[
float(tmp_splitted[idx])
for idx in range(len(coeff["FR"]))
],
np.float32,
)
tl_stack = np.vstack((tl_stack, value))
if name in coeff:
coeff[name] = np.concatenate(
(coeff[name], tl_stack), axis=1
)
else:
coeff[name] = tl_stack
elif name == "TQ":
tq_stack = value
for _ in range(len(coeff["AL"]) - 1):
line_num += 1
_, tmp = lines[line_num].split(":")
tmp_splitted = tmp.split()
value = np.array(
[
float(tmp_splitted[idx])
for idx in range(len(coeff["FR"]))
],
np.float32,
)
tq_stack = np.vstack((tq_stack, value))
coeff[name] = tq_stack
else:
if name in coeff:
coeff[name] = np.vstack((coeff[name], value))
else:
coeff[name] = value

f.close()
if (str(c_list[0][-3:]).lower() == "ret") and (len(c_list) == 1):
coeff = read_coeff_ascii(c_list)
if prefix == "tpb":
for key in ("W1", "W2"):
coeff[key] = coeff[key].squeeze(axis=2)
for key in ("input_offset", "input_scale", "output_offset", "output_scale"):
coeff[key] = coeff[key].squeeze(axis=0)

aux = [
"TS",
Expand All @@ -212,7 +48,7 @@ def get_mvr_coeff(site: str, prefix: str, freq: np.ndarray):
coeff[aux_i] = 0
coeff["FR_BL"] = coeff["FR"]

elif (str(c_list[0][-2:]).lower() == "nc") & (len(c_list) > 0):
elif (str(c_list[0][-2:]).lower() == "nc") and (len(c_list) > 0):
coeff["RT"] = Fill_Value_Int
N = len(c_list)

Expand Down Expand Up @@ -363,13 +199,6 @@ def f_quad(_x):
return np.empty(0)

elif coeff["RT"] == 2:
if len(coeff["AG"]) == 1:
coeff["W1"] = coeff["W1"][:, :, np.newaxis]
coeff["W2"] = coeff["W2"][:, :, np.newaxis]
coeff["input_scale"] = coeff["input_scale"][np.newaxis, :]
coeff["input_offset"] = coeff["input_offset"][np.newaxis, :]
coeff["output_scale"] = coeff["output_scale"][np.newaxis, :]
coeff["output_offset"] = coeff["output_offset"][np.newaxis, :]

def input_scale(x):
return np.array(
Expand Down Expand Up @@ -456,3 +285,72 @@ def factor(x):
factor,
)
)


def read_coeff_ascii(c_list: list) -> dict:
coeff: dict = {}

with open(c_list[0], "r", encoding="utf8") as f:
lines = f.readlines()

for line in lines:
if "=" in line[:3]:
key = line[:2]
if key != "NS":
coeff[key] = _parse_lines(f"{key}=", lines)
return {**coeff, **_read_ns(lines)}


def _parse_lines(prefix: str, lines: list) -> np.ndarray:
data = []
n_rows = 0
is_3d = False
for lineno, line in enumerate(lines):
if line.startswith(prefix):
n_rows += 1
data.append(_split_line(line))
for next_line in lines[lineno + 1 :]:
if next_line.startswith(":"):
data.append(_split_line(next_line))
is_3d = True
else:
break

data_squeezed: list[str] | list[list[str]]
if len(data) == 1 and isinstance(data[0], list) and len(data[0]) == 1:
data_squeezed = data[0]
else:
data_squeezed = data
try:
array = np.array(data_squeezed).astype(np.float32)
except ValueError:
array = np.array(data_squeezed).astype(str)
if is_3d:
array = np.reshape(array, (n_rows, -1, array.shape[1]))
array = np.transpose(array, (1, 2, 0))
if array.ndim == 2 and array.shape[0] == 1:
array = np.squeeze(array)
if array.ndim == 3 and array.shape[1] == 1:
array = np.squeeze(array, axis=1)
return array


def _read_ns(lines: list) -> dict:
d: dict = {
"input_offset": [],
"input_scale": [],
"output_offset": [],
"output_scale": [],
}
for lineno, line in enumerate(lines):
if line.startswith("NS="):
d["input_offset"].append(_split_line(lines[lineno]))
d["input_scale"].append(_split_line(lines[lineno + 1]))
d["output_offset"].append(_split_line(lines[lineno + 2]))
d["output_scale"].append(_split_line(lines[lineno + 3]))
return {key: np.array(value).astype(np.float32) for key, value in d.items()}


def _split_line(line: str) -> list[str]:
delimiter = ":" if ":" in line else "="
return line.split(delimiter)[1].split("#")[0].split()
9 changes: 9 additions & 0 deletions tests/test_get_ret_coeff.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ def test_coefficients():

for key, item in test_data.items():
data = get_mvr_coeff(SITE, key, FREQ)
if key == "lwp":
expected = np.array(
[-107.37779, -30.645275, -76.23844, 0.9421638, -27.80052],
dtype=np.float32,
)
assert_array_almost_equal(data[0]["W1"].flatten()[:5], expected)
for name, value in data[0].items():
shape: tuple
if isinstance(value, str):
Expand All @@ -142,6 +148,7 @@ def test_coefficients():
else:
first, last, mean = item[name][0]
shape = item[name][1]
# print(key, name, first, last, mean, shape)
_check(value, float(first), float(last), float(mean), shape=shape)


Expand All @@ -159,6 +166,8 @@ def _check(
first_value = data[0, 0, 0]
last_value = data[-1, -1, -1]

if isinstance(first_value, str):
return
assert_array_almost_equal(first_value, first, decimal=4)
assert_array_almost_equal(last_value, last, decimal=4)
assert_array_almost_equal(np.mean(data), mean_value, decimal=4)
Expand Down

0 comments on commit 97b3791

Please sign in to comment.