-
Notifications
You must be signed in to change notification settings - Fork 0
Add dump.py and load.py for one-time stdin/stdout database I/O #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
87ad473
ffa13d0
7182a23
13b52e5
214af6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| import sys | ||
| import asyncio | ||
| from sqlalchemy import select | ||
| from apyds_bnf import unparse | ||
| from .orm import initialize_database, Facts, Ideas | ||
|
|
||
|
|
||
| async def main(addr, engine=None, session=None): | ||
| if engine is None or session is None: | ||
| engine, session = await initialize_database(addr) | ||
|
|
||
| try: | ||
| async with session() as sess: | ||
| # Output all ideas first | ||
| for i in await sess.scalars(select(Ideas)): | ||
| print("idea:", unparse(i.data)) | ||
| # Then output all facts | ||
| for f in await sess.scalars(select(Facts)): | ||
| print("fact:", unparse(f.data)) | ||
| finally: | ||
| await engine.dispose() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| if len(sys.argv) != 2: | ||
| print(f"Usage: {sys.argv[0]} <database-addr>") | ||
| sys.exit(1) | ||
| asyncio.run(main(sys.argv[1])) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| import sys | ||
| import asyncio | ||
| from apyds_bnf import parse | ||
| from .orm import initialize_database, insert_or_ignore, Facts, Ideas | ||
| from .utility import str_rule_get_str_idea | ||
|
|
||
|
|
||
| async def main(addr, engine=None, session=None): | ||
| if engine is None or session is None: | ||
| engine, session = await initialize_database(addr) | ||
|
|
||
| try: | ||
| async with session() as sess: | ||
| # Read all lines from stdin | ||
| for line in sys.stdin: | ||
| data = line.strip() | ||
| if not data: | ||
| continue | ||
|
|
||
| try: | ||
| ds = parse(data) | ||
| except Exception as e: | ||
| print(f"error: {e}", file=sys.stderr) | ||
| continue | ||
|
|
||
| await insert_or_ignore(sess, Facts, ds) | ||
| if idea := str_rule_get_str_idea(ds): | ||
| await insert_or_ignore(sess, Ideas, idea) | ||
| await sess.commit() | ||
| finally: | ||
| await engine.dispose() | ||
|
||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| if len(sys.argv) != 2: | ||
| print(f"Usage: {sys.argv[0]} <database-addr>") | ||
| sys.exit(1) | ||
| asyncio.run(main(sys.argv[1])) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| import tempfile | ||
| import pathlib | ||
| import pytest | ||
| import pytest_asyncio | ||
| from ddss.orm import initialize_database, Facts, Ideas | ||
| from ddss.dump import main | ||
|
|
||
|
|
||
| @pytest_asyncio.fixture | ||
| async def temp_db(): | ||
| """Fixture to create a temporary database.""" | ||
| with tempfile.TemporaryDirectory() as tmpdir: | ||
| db_path = pathlib.Path(tmpdir) / "test.db" | ||
| addr = f"sqlite+aiosqlite:///{db_path.as_posix()}" | ||
| engine, session = await initialize_database(addr) | ||
| yield addr, engine, session | ||
| await engine.dispose() | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_dump_facts_correctly(temp_db, capsys): | ||
| """Test that Facts data is dumped correctly using unparse().""" | ||
| addr, engine, session = temp_db | ||
|
|
||
| # Add test data | ||
| async with session() as sess: | ||
| sess.add(Facts(data="a\n----\nb\n")) | ||
| await sess.commit() | ||
|
|
||
| # Run the dump function | ||
| await main(addr, engine, session) | ||
|
|
||
| # Check output | ||
| captured = capsys.readouterr() | ||
| assert "fact: a => b" in captured.out | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_dump_ideas_correctly(temp_db, capsys): | ||
| """Test that Ideas data is dumped correctly using unparse().""" | ||
| addr, engine, session = temp_db | ||
|
|
||
| # Add test data | ||
| async with session() as sess: | ||
| sess.add(Ideas(data="x\n----\ny\n")) | ||
| await sess.commit() | ||
|
|
||
| # Run the dump function | ||
| await main(addr, engine, session) | ||
|
|
||
| # Check output | ||
| captured = capsys.readouterr() | ||
| assert "idea: x => y" in captured.out | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_dump_multiple_entries(temp_db, capsys): | ||
| """Test that dump handles multiple Facts and Ideas correctly.""" | ||
| addr, engine, session = temp_db | ||
|
|
||
| # Add test data | ||
| async with session() as sess: | ||
| sess.add(Facts(data="a\n----\nb\n")) | ||
| sess.add(Facts(data="c\n----\nd\n")) | ||
| sess.add(Ideas(data="x\n----\ny\n")) | ||
| sess.add(Ideas(data="p\n----\nq\n")) | ||
| await sess.commit() | ||
|
|
||
| # Run the dump function | ||
| await main(addr, engine, session) | ||
|
|
||
| # Check output | ||
| captured = capsys.readouterr() | ||
| assert "idea: x => y" in captured.out | ||
| assert "idea: p => q" in captured.out | ||
| assert "fact: a => b" in captured.out | ||
| assert "fact: c => d" in captured.out | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_dump_empty_database(temp_db, capsys): | ||
| """Test that dump handles an empty database correctly.""" | ||
| addr, engine, session = temp_db | ||
|
|
||
| # Run the dump function with no data | ||
| await main(addr, engine, session) | ||
|
|
||
| # Check output - should be empty or just whitespace | ||
| captured = capsys.readouterr() | ||
| assert captured.out.strip() == "" | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_dump_order(temp_db, capsys): | ||
| """Test that ideas are dumped before facts.""" | ||
| addr, engine, session = temp_db | ||
|
|
||
| # Add test data | ||
| async with session() as sess: | ||
| sess.add(Facts(data="a\n----\nb\n")) | ||
| sess.add(Ideas(data="x\n----\ny\n")) | ||
| await sess.commit() | ||
|
|
||
| # Run the dump function | ||
| await main(addr, engine, session) | ||
|
|
||
| # Check output order - ideas should come before facts | ||
| captured = capsys.readouterr() | ||
| idea_pos = captured.out.find("idea: x => y") | ||
| fact_pos = captured.out.find("fact: a => b") | ||
| assert idea_pos < fact_pos, "Ideas should be printed before facts" | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_dump_with_simple_fact(temp_db, capsys): | ||
| """Test dumping a fact that starts with dashes.""" | ||
| addr, engine, session = temp_db | ||
|
|
||
| # Add test data - a fact that starts with "--" | ||
| async with session() as sess: | ||
| sess.add(Facts(data="----\nsimple\n")) | ||
| await sess.commit() | ||
|
|
||
| # Run the dump function | ||
| await main(addr, engine, session) | ||
|
|
||
| # Check output - unparse converts "----\nsimple\n" to " => simple" | ||
| captured = capsys.readouterr() | ||
| assert "fact: => simple" in captured.out |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| import tempfile | ||
| import pathlib | ||
| from unittest.mock import patch | ||
| from io import StringIO | ||
| import pytest | ||
| import pytest_asyncio | ||
| from sqlalchemy import select | ||
| from ddss.orm import initialize_database, Facts, Ideas | ||
| from ddss.load import main | ||
|
|
||
|
|
||
| @pytest_asyncio.fixture | ||
| async def temp_db(): | ||
| """Fixture to create a temporary database.""" | ||
| with tempfile.TemporaryDirectory() as tmpdir: | ||
| db_path = pathlib.Path(tmpdir) / "test.db" | ||
| addr = f"sqlite+aiosqlite:///{db_path.as_posix()}" | ||
| engine, session = await initialize_database(addr) | ||
| yield addr, engine, session | ||
| await engine.dispose() | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_load_valid_fact(temp_db): | ||
| """Test that valid input is parsed and stored as a Fact in the database.""" | ||
| addr, engine, session = temp_db | ||
|
|
||
| # Mock stdin with valid input | ||
| mock_stdin = StringIO("a => b\n") | ||
|
|
||
| with patch("sys.stdin", mock_stdin): | ||
| await main(addr, engine, session) | ||
|
|
||
| # Verify data was stored | ||
| async with session() as sess: | ||
| facts = await sess.scalars(select(Facts)) | ||
| facts_list = list(facts) | ||
| assert len(facts_list) == 1 | ||
| assert facts_list[0].data == "a\n----\nb\n" | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_load_generates_idea(temp_db): | ||
| """Test that input that parses to non-dashed form generates an Idea.""" | ||
| addr, engine, session = temp_db | ||
|
|
||
| # Mock stdin with valid input | ||
| mock_stdin = StringIO("a => b\n") | ||
|
|
||
| with patch("sys.stdin", mock_stdin): | ||
| await main(addr, engine, session) | ||
|
|
||
| # Verify both Fact and Idea were stored | ||
| async with session() as sess: | ||
| facts = await sess.scalars(select(Facts)) | ||
| facts_list = list(facts) | ||
| assert len(facts_list) == 1 | ||
| assert facts_list[0].data == "a\n----\nb\n" | ||
|
|
||
| ideas = await sess.scalars(select(Ideas)) | ||
| ideas_list = list(ideas) | ||
| assert len(ideas_list) == 1 | ||
| assert ideas_list[0].data == "----\na\n" | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_load_invalid_input_handling(temp_db, capsys): | ||
| """Test that invalid/malformed input is handled gracefully with error message.""" | ||
| addr, engine, session = temp_db | ||
|
|
||
| # Mock stdin with invalid input | ||
| mock_stdin = StringIO("=>\n") | ||
|
|
||
| with patch("sys.stdin", mock_stdin): | ||
| await main(addr, engine, session) | ||
|
|
||
| # Check that error was printed to stderr | ||
| captured = capsys.readouterr() | ||
| assert "error:" in captured.err | ||
|
|
||
| # Verify no data was stored | ||
| async with session() as sess: | ||
| facts = await sess.scalars(select(Facts)) | ||
| facts_list = list(facts) | ||
| assert len(facts_list) == 0 | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_load_empty_input_skipped(temp_db): | ||
| """Test that empty lines are skipped.""" | ||
| addr, engine, session = temp_db | ||
|
|
||
| # Mock stdin with empty lines and valid input | ||
| mock_stdin = StringIO("\n \na => b\n") | ||
|
|
||
| with patch("sys.stdin", mock_stdin): | ||
| await main(addr, engine, session) | ||
|
|
||
| # Verify only the valid input was stored | ||
| async with session() as sess: | ||
| facts = await sess.scalars(select(Facts)) | ||
| facts_list = list(facts) | ||
| assert len(facts_list) == 1 | ||
| assert facts_list[0].data == "a\n----\nb\n" | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_load_multiple_entries(temp_db): | ||
| """Test that multiple valid inputs are stored correctly.""" | ||
| addr, engine, session = temp_db | ||
|
|
||
| # Mock stdin with multiple valid inputs | ||
| mock_stdin = StringIO("a => b\nc => d\nsimple\n") | ||
|
|
||
| with patch("sys.stdin", mock_stdin): | ||
| await main(addr, engine, session) | ||
|
|
||
| # Verify all data was stored | ||
| async with session() as sess: | ||
| facts = await sess.scalars(select(Facts)) | ||
| facts_list = list(facts) | ||
| # Should have 3 facts | ||
| assert len(facts_list) == 3 | ||
| fact_data = [f.data for f in facts_list] | ||
| assert "a\n----\nb\n" in fact_data | ||
| assert "c\n----\nd\n" in fact_data | ||
| assert "----\nsimple\n" in fact_data | ||
|
|
||
| # Should have 2 ideas (from "a => b" and "c => d") | ||
| ideas = await sess.scalars(select(Ideas)) | ||
| ideas_list = list(ideas) | ||
| assert len(ideas_list) == 2 | ||
| idea_data = [i.data for i in ideas_list] | ||
| assert "----\na\n" in idea_data | ||
| assert "----\nc\n" in idea_data | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_load_mixed_valid_and_invalid(temp_db, capsys): | ||
| """Test that valid inputs are stored even when mixed with invalid ones.""" | ||
| addr, engine, session = temp_db | ||
|
|
||
| # Mock stdin with mixed valid and invalid inputs | ||
| mock_stdin = StringIO("a => b\n=>\nc => d\n") | ||
|
|
||
| with patch("sys.stdin", mock_stdin): | ||
| await main(addr, engine, session) | ||
|
|
||
| # Check that error was printed for invalid input | ||
| captured = capsys.readouterr() | ||
| assert "error:" in captured.err | ||
|
|
||
| # Verify valid data was stored | ||
| async with session() as sess: | ||
| facts = await sess.scalars(select(Facts)) | ||
| facts_list = list(facts) | ||
| assert len(facts_list) == 2 | ||
| fact_data = [f.data for f in facts_list] | ||
| assert "a\n----\nb\n" in fact_data | ||
| assert "c\n----\nd\n" in fact_data | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_load_empty_stdin(temp_db): | ||
| """Test that loading from empty stdin completes without errors.""" | ||
| addr, engine, session = temp_db | ||
|
|
||
| # Mock empty stdin | ||
| mock_stdin = StringIO("") | ||
|
|
||
| with patch("sys.stdin", mock_stdin): | ||
| await main(addr, engine, session) | ||
|
|
||
| # Verify no data was stored | ||
| async with session() as sess: | ||
| facts = await sess.scalars(select(Facts)) | ||
| facts_list = list(facts) | ||
| assert len(facts_list) == 0 | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_load_duplicate_entries(temp_db): | ||
| """Test that duplicate entries are ignored due to unique constraint.""" | ||
| addr, engine, session = temp_db | ||
|
|
||
| # Pre-populate with one entry | ||
| async with session() as sess: | ||
| sess.add(Facts(data="a\n----\nb\n")) | ||
| await sess.commit() | ||
|
|
||
| # Try to load the same entry again | ||
| mock_stdin = StringIO("a => b\n") | ||
|
|
||
| with patch("sys.stdin", mock_stdin): | ||
| await main(addr, engine, session) | ||
|
|
||
| # Verify only one entry exists (duplicate was ignored) | ||
| async with session() as sess: | ||
| facts = await sess.scalars(select(Facts)) | ||
| facts_list = list(facts) | ||
| assert len(facts_list) == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The engine.dispose() call in the finally block will execute even when engine and session are passed as arguments by tests. This means tests lose control of the engine lifecycle, potentially causing issues if multiple test operations need to be performed with the same engine. Consider only disposing the engine if it was created within this function by checking if the initial engine parameter was None.