-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update ATF importer #552
Open
khoidt
wants to merge
35
commits into
master
Choose a base branch
from
atf-import-update
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Update ATF importer #552
Changes from 25 commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
68d54f7
Refactor atf importer (WiP)
khoidt e3e058a
Update ebl/atf_importer/application/lemma_lookup.py
khoidt ab33279
Update ebl/atf_importer/application/lemma_lookup.py
khoidt f6bed68
Update ebl/atf_importer/application/atf_importer_base.py
khoidt 44f2daf
Update ebl/atf_importer/application/atf_importer_base.py
khoidt a3e9e52
Update ebl/atf_importer/domain/atf_preprocessor_cdli.py
khoidt 43a9cfa
Update ebl/atf_importer/domain/atf_preprocessor_cdli.py
khoidt 860ad08
Update ebl/atf_importer/domain/atf_preprocessor_cdli.py
khoidt 171ebac
Fix lark paths
khoidt 65aa777
Update test
khoidt 0ffb31e
Refactor & update
khoidt 72813c8
Clean up
khoidt 6bdc503
Refactor more
khoidt b50e5b9
Update
khoidt 08f4430
Fix type
khoidt ac21caa
Improve
khoidt 4f53377
Improve
khoidt cf568d2
Update & fix preprocessor tests
khoidt 9c19bc4
Refactor & update
khoidt 115c5f6
Fix test (use transliteration chars)
khoidt 0426000
Improve
khoidt 7464010
Fix glossary data (WiP)
khoidt 8d03672
Update, improve & refactor to fix test (WiP)
khoidt b1c8081
Update, refactor & add logging (WiP)
khoidt a7e070a
Update logging & improve
khoidt e662c72
Refactor
khoidt fb84e2c
Update logging
khoidt e39e8b3
Update preprocessor & add importer test (WiP)
khoidt 14be543
Update atf preprocessor (WiP)
khoidt 18a1ef7
Fix
khoidt 3c50de9
Update structure, use only ebl atf parser (WiP)
khoidt 1cea5ff
Refactor, update & fix tests (WiP)
khoidt 172631a
Update (WiP)
khoidt b2a0405
Refactor & fix tests (WiP)
khoidt 4e4693f
Merge remote-tracking branch 'origin/master' into atf-import-update
khoidt File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import json | ||
from typing import Dict, List, TypedDict, Union, Literal, Optional | ||
|
||
|
||
class CliArgKwargs(TypedDict, total=False): | ||
required: bool | ||
help: str | ||
default: Optional[str] | ||
choices: Optional[List[str]] | ||
|
||
|
||
class CliArgument(TypedDict): | ||
flags: List[str] | ||
kwargs: CliArgKwargs | ||
|
||
|
||
class AtfImporterConfigData(TypedDict): | ||
STYLES: Dict[int, str] | ||
POS_TAGS: List[str] | ||
NOUN_POS_TAGS: List[str] | ||
CLI_ARGS: List[CliArgument] | ||
|
||
|
||
class AtfImporterConfig: | ||
config_data: AtfImporterConfigData | ||
config_path = "ebl/atf_importer/domain/atf_importer_config.json" | ||
|
||
def __init__(self): | ||
with open(self.config_path, "r") as file: | ||
self.config_data: AtfImporterConfigData = json.load(file) | ||
|
||
def __getitem__( | ||
self, item: Literal["STYLES", "POS_TAGS", "NOUN_POS_TAGS", "CLI_ARGS"] | ||
) -> Union[Dict[str, int], List[str]]: | ||
return getattr(self.config_data, item) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
from typing import Dict, List, Tuple, Optional, Sequence | ||
from ebl.app import create_context | ||
from ebl.fragmentarium.application.fragment_updater import FragmentUpdater | ||
from ebl.fragmentarium.application.transliteration_update_factory import ( | ||
TransliterationUpdateFactory, | ||
) | ||
from ebl.fragmentarium.web.dtos import parse_museum_number | ||
from ebl.lemmatization.domain.lemmatization import Lemmatization, LemmatizationToken | ||
from ebl.transliteration.domain.atf import Atf | ||
from ebl.transliteration.domain.lark_parser import parse_atf_lark | ||
from ebl.users.domain.user import AtfImporterUser | ||
from ebl.atf_importer.domain.atf_preprocessor_util import Util | ||
|
||
|
||
class DatabaseImporter: | ||
def __init__(self, database, logger, username: str): | ||
self.database = database | ||
self.logger = logger | ||
self.user = AtfImporterUser(username) | ||
context = create_context() | ||
self.transliteration_factory: TransliterationUpdateFactory = ( | ||
context.get_transliteration_update_factory() | ||
) | ||
self.updater: FragmentUpdater = context.get_fragment_updater() | ||
|
||
def import_into_database(self, ebl_lines: Dict[str, List], filename: str): | ||
museum_number: Optional[str] = self._retrieve_museum_number(ebl_lines, filename) | ||
if not museum_number: | ||
self.logger.error( | ||
f"{filename} could not be imported: Museum number not found", | ||
"not_imported_files", | ||
) | ||
self.logger.info(Util.print_frame(f'Conversion of "{filename}.atf" failed')) | ||
return | ||
if self._check_fragment_exists(museum_number): | ||
self._import(ebl_lines, museum_number, filename) | ||
|
||
def _import(self, ebl_lines: Dict[str, List], museum_number: str, filename: str): | ||
try: | ||
self._insert_transliterations( | ||
ebl_lines["transliteration"], | ||
museum_number, | ||
) | ||
self._insert_lemmatization(ebl_lines["lemmatization"], museum_number) | ||
self.logger(f"{filename} successfully imported", "imported_files") | ||
except Exception as e: | ||
self.logger.error( | ||
f"Error importing {filename}: {str(e)}", "not_imported_files" | ||
) | ||
|
||
def _get_valid_museum_number_or_none( | ||
self, museum_number_string: str | ||
) -> Optional[str]: | ||
try: | ||
parse_museum_number(museum_number_string) | ||
self.logger.info(f"Museum number '{museum_number_string}' is valid") | ||
return museum_number_string | ||
except ValueError: | ||
return | ||
|
||
def _retrieve_museum_number( | ||
self, ebl_lines: Dict[str, List], filename: str | ||
) -> Optional[str]: | ||
if museum_number := self._get_museum_number_by_cdli_number( | ||
ebl_lines["control_lines"] | ||
): | ||
return museum_number | ||
for line in ebl_lines["control_lines"]: | ||
linesplit = line["c_line"].split("=") | ||
if len(linesplit) > 1 and ( | ||
museum_number := self._get_valid_museum_number_or_none( | ||
linesplit[-1].strip() | ||
) | ||
): | ||
return museum_number | ||
self.logger.error(f"Could not find a valid museum number in '{filename}'") | ||
return self._input_museum_number(filename) | ||
|
||
def _input_museum_number( | ||
self, filename: str, museum_number: Optional[str] = None | ||
) -> Optional[str]: | ||
while museum_number is None: | ||
museum_number_input = input( | ||
"Please enter a valid museum number (enter 'skip' to skip this file): " | ||
) | ||
if museum_number_input.lower() == "skip": | ||
return None | ||
museum_number = self._get_valid_museum_number_or_none(museum_number_input) | ||
return museum_number | ||
|
||
def _get_museum_number_by_cdli_number(self, control_lines) -> Optional[str]: | ||
if cdli_number := self._get_cdli_number(control_lines): | ||
for entry in self.database.get_collection("fragments").find( | ||
{"externalNumbers.cdliNumber": cdli_number}, {"museumNumber"} | ||
): | ||
if "_id" in entry.keys(): | ||
return entry["_id"] | ||
self.logger.warning( | ||
f"No museum number to CDLI number '{cdli_number}' found." | ||
" Trying to parse from the original file..." | ||
) | ||
return None | ||
|
||
@staticmethod | ||
def _get_cdli_number(control_lines) -> Optional[str]: | ||
for line in control_lines: | ||
cdli_number = line["c_line"].split("=")[0].strip().replace("&", "") | ||
return cdli_number | ||
return None | ||
|
||
def _check_fragment_exists(self, museum_number: str) -> bool: | ||
exists = list( | ||
self.database.get_collection("fragments").find( | ||
{"museumNumber": museum_number}, {"text.lines.0"} | ||
) | ||
) | ||
return bool(exists) | ||
|
||
def _insert_transliterations( | ||
self, | ||
transliterations: List[str], | ||
museum_number: str, | ||
) -> None: | ||
converted_transliteration = "\n".join(transliterations) | ||
transliteration = self.transliteration_factory.create( | ||
Atf(converted_transliteration) | ||
) | ||
self.updater.update_transliteration( | ||
parse_museum_number(museum_number), transliteration, self.user | ||
) | ||
|
||
def _insert_lemmatization( | ||
self, | ||
lemmatizations: List[Tuple[str, List[Dict]]], | ||
museum_number: str, | ||
): | ||
lemmatization_tokens = self._get_lemmatization_tokens(lemmatizations) | ||
lemmatization = Lemmatization([lemmatization_tokens]) | ||
self.updater.update_lemmatization( | ||
parse_museum_number(museum_number), lemmatization, self.user | ||
) | ||
|
||
def _get_lemmatization_tokens( | ||
self, lemmatizations: List[Tuple[str, List[Dict]]] | ||
) -> Sequence[LemmatizationToken]: | ||
lemmatization_tokens: List[LemmatizationToken] = [] | ||
for text_line, lemmas in lemmatizations: | ||
ebl_lines = parse_atf_lark(text_line).lines[0].content | ||
lemmatization_tokens = self._get_lemmatization_tokens_in_lines( | ||
ebl_lines, lemmas, lemmatization_tokens | ||
) | ||
return lemmatization_tokens | ||
|
||
def _get_lemmatization_tokens_in_lines( | ||
self, | ||
ebl_lines, | ||
lemmas, | ||
lemmatization_tokens: List[LemmatizationToken], | ||
) -> List[LemmatizationToken]: | ||
for token in ebl_lines: | ||
lemma_ids = [ | ||
lemma["_id"] for lemma in lemmas if lemma["lemma"] == token.value | ||
] | ||
lemmatization_tokens.append( | ||
LemmatizationToken(token.value, tuple(lemma_ids) if lemma_ids else None) | ||
) | ||
return lemmatization_tokens |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
import re | ||
from typing import Dict, List, Tuple, Optional, Iterator, TypedDict | ||
from ebl.atf_importer.application.atf_importer_config import AtfImporterConfigData | ||
|
||
|
||
class GlossaryParserData(TypedDict): | ||
lemgwpos_cf: Dict[str, str] | ||
forms_senses: Dict[str, List[str]] | ||
lemposgw_cfgw: Dict[str, Tuple[str, str]] | ||
|
||
|
||
class GlossaryParser: | ||
def __init__(self, config: AtfImporterConfigData): | ||
self.config = config | ||
self.lemgwpos_cf: Dict[str, str] = {} | ||
self.forms_senses: Dict[str, List[str]] = {} | ||
self.lemposgw_cfgw: Dict[str, Tuple[str, str]] = {} | ||
|
||
@property | ||
def data(self) -> GlossaryParserData: | ||
return { | ||
"lemgwpos_cf": self.lemgwpos_cf, | ||
"forms_senses": self.forms_senses, | ||
"lemposgw_cfgw": self.lemposgw_cfgw, | ||
} | ||
|
||
def parse(self, file: Iterator[str]) -> GlossaryParserData: | ||
current_entry: Dict[str, str] = {} | ||
lemmas: List[str] = [] | ||
for line in file: | ||
line = line.strip() | ||
if line.startswith("@entry"): | ||
lemmas, current_entry = self._handle_entry(line, lemmas) | ||
elif line.startswith("@form"): | ||
lemmas = self._handle_form(line, current_entry, lemmas) | ||
elif line.startswith("@sense"): | ||
self._handle_sense(line, lemmas, current_entry) | ||
return self.data | ||
|
||
def _handle_entry( | ||
self, line: str, lemmas: List[str] | ||
) -> Tuple[List[str], Dict[str, str]]: | ||
lemmas.clear() | ||
return lemmas, self._parse_entry(line) | ||
|
||
def _handle_form( | ||
self, line: str, current_entry: Dict[str, str], lemmas: List[str] | ||
) -> List[str]: | ||
lemma = self._parse_form(line, current_entry) | ||
if lemma: | ||
lemmas.append(lemma) | ||
return lemmas | ||
|
||
def _handle_sense( | ||
self, line: str, lemmas: List[str], current_entry: Dict[str, str] | ||
) -> None: | ||
self._parse_sense(line, lemmas, current_entry) | ||
|
||
def _parse_entry(self, line: str) -> Dict[str, str]: | ||
entry = {} | ||
parts = line.split(" ", 2) | ||
if len(parts) > 1: | ||
entry["cf"] = parts[1].replace("ʾ", "'").strip() | ||
description = parts[2] if len(parts) > 2 else "" | ||
if match := re.search(r"\[(.*?)\] (.*)", description): | ||
entry["gw"], entry["pos"] = match.groups() | ||
entry["gw"] = entry["gw"].strip() | ||
entry["pos"] = entry["pos"].strip() | ||
return entry | ||
|
||
def _parse_form(self, line: str, current_entry: Dict[str, str]) -> Optional[str]: | ||
parts = line.split(" ") | ||
if len(parts) > 2: | ||
lemma = parts[2].lstrip("$").rstrip("\n") | ||
if ( | ||
"cf" in current_entry | ||
and "gw" in current_entry | ||
and "pos" in current_entry | ||
): | ||
key = f"{lemma}{current_entry['pos']}{current_entry['gw']}" | ||
self.lemgwpos_cf[key] = current_entry["cf"] | ||
return lemma | ||
return None | ||
|
||
def _parse_sense( | ||
self, line: str, lemmas: List[str], current_entry: Dict[str, str] | ||
) -> None: | ||
pos_tag, sense = self._extract_pos_tag_and_sense(line) | ||
for lemma in lemmas: | ||
self._update_forms_senses(lemma, sense) | ||
self._update_lemposgw_cfgw(lemma, pos_tag, sense, current_entry) | ||
|
||
def _extract_pos_tag_and_sense( | ||
self, line: str | ||
) -> Tuple[Optional[str], Optional[str]]: | ||
pos_tags = list(set(line.split(" ", 2)).intersection(self.config["POS_TAGS"])) | ||
pos_tag = pos_tags[0] if pos_tags[0] else "" | ||
sense = line.split(pos_tag)[1].rstrip("\n") | ||
return pos_tag, sense | ||
|
||
def _update_forms_senses(self, lemma: str, sense: Optional[str]) -> None: | ||
if sense: | ||
if lemma not in self.forms_senses: | ||
self.forms_senses[lemma] = [sense] | ||
else: | ||
self.forms_senses[lemma].append(sense) | ||
|
||
def _update_lemposgw_cfgw( | ||
self, | ||
lemma: str, | ||
pos_tag: Optional[str], | ||
sense: Optional[str], | ||
current_entry: Dict[str, str], | ||
) -> None: | ||
if sense and "gw" in current_entry: | ||
sense_key = f"{lemma}{pos_tag}{sense}" | ||
self.lemposgw_cfgw[sense_key] = (current_entry["cf"], current_entry["gw"]) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Check notice
Code scanning / CodeQL
Explicit returns mixed with implicit (fall through) returns Note