Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ATF importer #552

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
68d54f7
Refactor atf importer (WiP)
khoidt May 16, 2024
e3e058a
Update ebl/atf_importer/application/lemma_lookup.py
khoidt May 16, 2024
ab33279
Update ebl/atf_importer/application/lemma_lookup.py
khoidt May 16, 2024
f6bed68
Update ebl/atf_importer/application/atf_importer_base.py
khoidt May 16, 2024
44f2daf
Update ebl/atf_importer/application/atf_importer_base.py
khoidt May 16, 2024
a3e9e52
Update ebl/atf_importer/domain/atf_preprocessor_cdli.py
khoidt May 16, 2024
43a9cfa
Update ebl/atf_importer/domain/atf_preprocessor_cdli.py
khoidt May 16, 2024
860ad08
Update ebl/atf_importer/domain/atf_preprocessor_cdli.py
khoidt May 16, 2024
171ebac
Fix lark paths
khoidt May 16, 2024
65aa777
Update test
khoidt May 16, 2024
0ffb31e
Refactor & update
khoidt May 17, 2024
72813c8
Clean up
khoidt May 17, 2024
6bdc503
Refactor more
khoidt May 17, 2024
b50e5b9
Update
khoidt May 17, 2024
08f4430
Fix type
khoidt May 21, 2024
ac21caa
Improve
khoidt May 21, 2024
4f53377
Improve
khoidt May 21, 2024
cf568d2
Update & fix preprocessor tests
khoidt May 23, 2024
9c19bc4
Refactor & update
khoidt May 23, 2024
115c5f6
Fix test (use transliteration chars)
khoidt May 23, 2024
0426000
Improve
khoidt May 23, 2024
7464010
Fix glossary data (WiP)
khoidt May 24, 2024
8d03672
Update, improve & refactor to fix test (WiP)
khoidt May 27, 2024
b1c8081
Update, refactor & add logging (WiP)
khoidt May 28, 2024
a7e070a
Update logging & improve
khoidt May 29, 2024
e662c72
Refactor
khoidt May 31, 2024
fb84e2c
Update logging
khoidt May 31, 2024
e39e8b3
Update preprocessor & add importer test (WiP)
khoidt Jun 3, 2024
14be543
Update atf preprocessor (WiP)
khoidt Jun 4, 2024
18a1ef7
Fix
khoidt Jun 4, 2024
3c50de9
Update structure, use only ebl atf parser (WiP)
khoidt Jun 12, 2024
1cea5ff
Refactor, update & fix tests (WiP)
khoidt Jul 4, 2024
172631a
Update (WiP)
khoidt Jul 10, 2024
b2a0405
Refactor & fix tests (WiP)
khoidt Oct 15, 2024
4e4693f
Merge remote-tracking branch 'origin/master' into atf-import-update
khoidt Oct 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
694 changes: 94 additions & 600 deletions ebl/atf_importer/application/atf_importer.py

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions ebl/atf_importer/application/atf_importer_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import json
from typing import Dict, List, TypedDict, Union, Literal, Optional


class CliArgKwargs(TypedDict, total=False):
required: bool
help: str
default: Optional[str]
choices: Optional[List[str]]


class CliArgument(TypedDict):
flags: List[str]
kwargs: CliArgKwargs


class AtfImporterConfigData(TypedDict):
STYLES: Dict[int, str]
POS_TAGS: List[str]
NOUN_POS_TAGS: List[str]
CLI_ARGS: List[CliArgument]


class AtfImporterConfig:
config_data: AtfImporterConfigData
config_path = "ebl/atf_importer/domain/atf_importer_config.json"

def __init__(self):
with open(self.config_path, "r") as file:
self.config_data: AtfImporterConfigData = json.load(file)

def __getitem__(
self, item: Literal["STYLES", "POS_TAGS", "NOUN_POS_TAGS", "CLI_ARGS"]
) -> Union[Dict[str, int], List[str]]:
return getattr(self.config_data, item)
167 changes: 167 additions & 0 deletions ebl/atf_importer/application/database_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from typing import Dict, List, Tuple, Optional, Sequence
from ebl.app import create_context
from ebl.fragmentarium.application.fragment_updater import FragmentUpdater
from ebl.fragmentarium.application.transliteration_update_factory import (
TransliterationUpdateFactory,
)
from ebl.fragmentarium.web.dtos import parse_museum_number
from ebl.lemmatization.domain.lemmatization import Lemmatization, LemmatizationToken
from ebl.transliteration.domain.atf import Atf
from ebl.transliteration.domain.lark_parser import parse_atf_lark
from ebl.users.domain.user import AtfImporterUser
from ebl.atf_importer.domain.atf_preprocessor_util import Util


class DatabaseImporter:
def __init__(self, database, logger, username: str):
self.database = database
self.logger = logger
self.user = AtfImporterUser(username)
context = create_context()
self.transliteration_factory: TransliterationUpdateFactory = (
context.get_transliteration_update_factory()
)
self.updater: FragmentUpdater = context.get_fragment_updater()

def import_into_database(self, ebl_lines: Dict[str, List], filename: str):
museum_number: Optional[str] = self._retrieve_museum_number(ebl_lines, filename)
if not museum_number:
self.logger.error(
f"{filename} could not be imported: Museum number not found",
"not_imported_files",
)
self.logger.info(Util.print_frame(f'Conversion of "{filename}.atf" failed'))
return
if self._check_fragment_exists(museum_number):
self._import(ebl_lines, museum_number, filename)

def _import(self, ebl_lines: Dict[str, List], museum_number: str, filename: str):
try:
self._insert_transliterations(
ebl_lines["transliteration"],
museum_number,
)
self._insert_lemmatization(ebl_lines["lemmatization"], museum_number)
self.logger(f"{filename} successfully imported", "imported_files")
except Exception as e:
self.logger.error(
f"Error importing {filename}: {str(e)}", "not_imported_files"
)

def _get_valid_museum_number_or_none(
self, museum_number_string: str
) -> Optional[str]:
Comment on lines +51 to +53

Check notice

Code scanning / CodeQL

Explicit returns mixed with implicit (fall through) returns Note

Mixing implicit and explicit returns may indicate an error as implicit returns always return None.
try:
parse_museum_number(museum_number_string)
self.logger.info(f"Museum number '{museum_number_string}' is valid")
return museum_number_string
except ValueError:
return

def _retrieve_museum_number(
self, ebl_lines: Dict[str, List], filename: str
) -> Optional[str]:
if museum_number := self._get_museum_number_by_cdli_number(
ebl_lines["control_lines"]
):
return museum_number
for line in ebl_lines["control_lines"]:
linesplit = line["c_line"].split("=")
if len(linesplit) > 1 and (
museum_number := self._get_valid_museum_number_or_none(
linesplit[-1].strip()
)
):
return museum_number
self.logger.error(f"Could not find a valid museum number in '{filename}'")
return self._input_museum_number(filename)

def _input_museum_number(
self, filename: str, museum_number: Optional[str] = None
) -> Optional[str]:
while museum_number is None:
museum_number_input = input(
"Please enter a valid museum number (enter 'skip' to skip this file): "
)
if museum_number_input.lower() == "skip":
return None
museum_number = self._get_valid_museum_number_or_none(museum_number_input)
return museum_number

def _get_museum_number_by_cdli_number(self, control_lines) -> Optional[str]:
if cdli_number := self._get_cdli_number(control_lines):
for entry in self.database.get_collection("fragments").find(
{"externalNumbers.cdliNumber": cdli_number}, {"museumNumber"}
):
if "_id" in entry.keys():
return entry["_id"]
self.logger.warning(
f"No museum number to CDLI number '{cdli_number}' found."
" Trying to parse from the original file..."
)
return None

@staticmethod
def _get_cdli_number(control_lines) -> Optional[str]:
for line in control_lines:
cdli_number = line["c_line"].split("=")[0].strip().replace("&", "")
return cdli_number
return None

def _check_fragment_exists(self, museum_number: str) -> bool:
exists = list(
self.database.get_collection("fragments").find(
{"museumNumber": museum_number}, {"text.lines.0"}
)
)
return bool(exists)

def _insert_transliterations(
self,
transliterations: List[str],
museum_number: str,
) -> None:
converted_transliteration = "\n".join(transliterations)
transliteration = self.transliteration_factory.create(
Atf(converted_transliteration)
)
self.updater.update_transliteration(
parse_museum_number(museum_number), transliteration, self.user
)

def _insert_lemmatization(
self,
lemmatizations: List[Tuple[str, List[Dict]]],
museum_number: str,
):
lemmatization_tokens = self._get_lemmatization_tokens(lemmatizations)
lemmatization = Lemmatization([lemmatization_tokens])
self.updater.update_lemmatization(
parse_museum_number(museum_number), lemmatization, self.user
)

def _get_lemmatization_tokens(
self, lemmatizations: List[Tuple[str, List[Dict]]]
) -> Sequence[LemmatizationToken]:
lemmatization_tokens: List[LemmatizationToken] = []
for text_line, lemmas in lemmatizations:
ebl_lines = parse_atf_lark(text_line).lines[0].content
lemmatization_tokens = self._get_lemmatization_tokens_in_lines(
ebl_lines, lemmas, lemmatization_tokens
)
return lemmatization_tokens

def _get_lemmatization_tokens_in_lines(
self,
ebl_lines,
lemmas,
lemmatization_tokens: List[LemmatizationToken],
) -> List[LemmatizationToken]:
for token in ebl_lines:
lemma_ids = [
lemma["_id"] for lemma in lemmas if lemma["lemma"] == token.value
]
lemmatization_tokens.append(
LemmatizationToken(token.value, tuple(lemma_ids) if lemma_ids else None)
)
return lemmatization_tokens
117 changes: 117 additions & 0 deletions ebl/atf_importer/application/glossary_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import re
from typing import Dict, List, Tuple, Optional, Iterator, TypedDict
from ebl.atf_importer.application.atf_importer_config import AtfImporterConfigData


class GlossaryParserData(TypedDict):
lemgwpos_cf: Dict[str, str]
forms_senses: Dict[str, List[str]]
lemposgw_cfgw: Dict[str, Tuple[str, str]]


class GlossaryParser:
def __init__(self, config: AtfImporterConfigData):
self.config = config
self.lemgwpos_cf: Dict[str, str] = {}
self.forms_senses: Dict[str, List[str]] = {}
self.lemposgw_cfgw: Dict[str, Tuple[str, str]] = {}

@property
def data(self) -> GlossaryParserData:
return {
"lemgwpos_cf": self.lemgwpos_cf,
"forms_senses": self.forms_senses,
"lemposgw_cfgw": self.lemposgw_cfgw,
}

def parse(self, file: Iterator[str]) -> GlossaryParserData:
current_entry: Dict[str, str] = {}
lemmas: List[str] = []
for line in file:
line = line.strip()
if line.startswith("@entry"):
lemmas, current_entry = self._handle_entry(line, lemmas)
elif line.startswith("@form"):
lemmas = self._handle_form(line, current_entry, lemmas)
elif line.startswith("@sense"):
self._handle_sense(line, lemmas, current_entry)
return self.data

def _handle_entry(
self, line: str, lemmas: List[str]
) -> Tuple[List[str], Dict[str, str]]:
lemmas.clear()
return lemmas, self._parse_entry(line)

def _handle_form(
self, line: str, current_entry: Dict[str, str], lemmas: List[str]
) -> List[str]:
lemma = self._parse_form(line, current_entry)
if lemma:
lemmas.append(lemma)
return lemmas

def _handle_sense(
self, line: str, lemmas: List[str], current_entry: Dict[str, str]
) -> None:
self._parse_sense(line, lemmas, current_entry)

def _parse_entry(self, line: str) -> Dict[str, str]:
entry = {}
parts = line.split(" ", 2)
if len(parts) > 1:
entry["cf"] = parts[1].replace("ʾ", "'").strip()
description = parts[2] if len(parts) > 2 else ""
if match := re.search(r"\[(.*?)\] (.*)", description):
entry["gw"], entry["pos"] = match.groups()
entry["gw"] = entry["gw"].strip()
entry["pos"] = entry["pos"].strip()
return entry

def _parse_form(self, line: str, current_entry: Dict[str, str]) -> Optional[str]:
parts = line.split(" ")
if len(parts) > 2:
lemma = parts[2].lstrip("$").rstrip("\n")
if (
"cf" in current_entry
and "gw" in current_entry
and "pos" in current_entry
):
key = f"{lemma}{current_entry['pos']}{current_entry['gw']}"
self.lemgwpos_cf[key] = current_entry["cf"]
return lemma
return None

def _parse_sense(
self, line: str, lemmas: List[str], current_entry: Dict[str, str]
) -> None:
pos_tag, sense = self._extract_pos_tag_and_sense(line)
for lemma in lemmas:
self._update_forms_senses(lemma, sense)
self._update_lemposgw_cfgw(lemma, pos_tag, sense, current_entry)

def _extract_pos_tag_and_sense(
self, line: str
) -> Tuple[Optional[str], Optional[str]]:
pos_tags = list(set(line.split(" ", 2)).intersection(self.config["POS_TAGS"]))
pos_tag = pos_tags[0] if pos_tags[0] else ""
sense = line.split(pos_tag)[1].rstrip("\n")
return pos_tag, sense

def _update_forms_senses(self, lemma: str, sense: Optional[str]) -> None:
if sense:
if lemma not in self.forms_senses:
self.forms_senses[lemma] = [sense]
else:
self.forms_senses[lemma].append(sense)

def _update_lemposgw_cfgw(
self,
lemma: str,
pos_tag: Optional[str],
sense: Optional[str],
current_entry: Dict[str, str],
) -> None:
if sense and "gw" in current_entry:
sense_key = f"{lemma}{pos_tag}{sense}"
self.lemposgw_cfgw[sense_key] = (current_entry["cf"], current_entry["gw"])
Loading
Loading