Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions ddss/dump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import sys
import asyncio
from sqlalchemy import select
from apyds_bnf import unparse
from .orm import initialize_database, Facts, Ideas


async def main(addr, engine=None, session=None):
if engine is None or session is None:
engine, session = await initialize_database(addr)

try:
async with session() as sess:
# Output all ideas first
for i in await sess.scalars(select(Ideas)):
print("idea:", unparse(i.data))
# Then output all facts
for f in await sess.scalars(select(Facts)):
print("fact:", unparse(f.data))
finally:
await engine.dispose()
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The engine.dispose() call in the finally block will execute even when engine and session are passed as arguments by tests. This means tests lose control of the engine lifecycle, potentially causing issues if multiple test operations need to be performed with the same engine. Consider only disposing the engine if it was created within this function by checking if the initial engine parameter was None.

Copilot uses AI. Check for mistakes.


if __name__ == "__main__":
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <database-addr>")
sys.exit(1)
asyncio.run(main(sys.argv[1]))
38 changes: 38 additions & 0 deletions ddss/load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import sys
import asyncio
from apyds_bnf import parse
from .orm import initialize_database, insert_or_ignore, Facts, Ideas
from .utility import str_rule_get_str_idea


async def main(addr, engine=None, session=None):
if engine is None or session is None:
engine, session = await initialize_database(addr)

try:
async with session() as sess:
# Read all lines from stdin
for line in sys.stdin:
data = line.strip()
if not data:
continue

try:
ds = parse(data)
except Exception as e:
print(f"error: {e}", file=sys.stderr)
continue

await insert_or_ignore(sess, Facts, ds)
if idea := str_rule_get_str_idea(ds):
await insert_or_ignore(sess, Ideas, idea)
await sess.commit()
finally:
await engine.dispose()
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The engine.dispose() call in the finally block will execute even when engine and session are passed as arguments by tests. This means tests lose control of the engine lifecycle, potentially causing issues if multiple test operations need to be performed with the same engine. Consider only disposing the engine if it was created within this function by checking if the initial engine parameter was None.

Copilot uses AI. Check for mistakes.


if __name__ == "__main__":
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <database-addr>")
sys.exit(1)
asyncio.run(main(sys.argv[1]))
129 changes: 129 additions & 0 deletions tests/test_dump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import tempfile
import pathlib
import pytest
import pytest_asyncio
from ddss.orm import initialize_database, Facts, Ideas
from ddss.dump import main


@pytest_asyncio.fixture
async def temp_db():
"""Fixture to create a temporary database."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = pathlib.Path(tmpdir) / "test.db"
addr = f"sqlite+aiosqlite:///{db_path.as_posix()}"
engine, session = await initialize_database(addr)
yield addr, engine, session
await engine.dispose()


@pytest.mark.asyncio
async def test_dump_facts_correctly(temp_db, capsys):
"""Test that Facts data is dumped correctly using unparse()."""
addr, engine, session = temp_db

# Add test data
async with session() as sess:
sess.add(Facts(data="a\n----\nb\n"))
await sess.commit()

# Run the dump function
await main(addr, engine, session)

# Check output
captured = capsys.readouterr()
assert "fact: a => b" in captured.out


@pytest.mark.asyncio
async def test_dump_ideas_correctly(temp_db, capsys):
"""Test that Ideas data is dumped correctly using unparse()."""
addr, engine, session = temp_db

# Add test data
async with session() as sess:
sess.add(Ideas(data="x\n----\ny\n"))
await sess.commit()

# Run the dump function
await main(addr, engine, session)

# Check output
captured = capsys.readouterr()
assert "idea: x => y" in captured.out


@pytest.mark.asyncio
async def test_dump_multiple_entries(temp_db, capsys):
"""Test that dump handles multiple Facts and Ideas correctly."""
addr, engine, session = temp_db

# Add test data
async with session() as sess:
sess.add(Facts(data="a\n----\nb\n"))
sess.add(Facts(data="c\n----\nd\n"))
sess.add(Ideas(data="x\n----\ny\n"))
sess.add(Ideas(data="p\n----\nq\n"))
await sess.commit()

# Run the dump function
await main(addr, engine, session)

# Check output
captured = capsys.readouterr()
assert "idea: x => y" in captured.out
assert "idea: p => q" in captured.out
assert "fact: a => b" in captured.out
assert "fact: c => d" in captured.out


@pytest.mark.asyncio
async def test_dump_empty_database(temp_db, capsys):
"""Test that dump handles an empty database correctly."""
addr, engine, session = temp_db

# Run the dump function with no data
await main(addr, engine, session)

# Check output - should be empty or just whitespace
captured = capsys.readouterr()
assert captured.out.strip() == ""


@pytest.mark.asyncio
async def test_dump_order(temp_db, capsys):
"""Test that ideas are dumped before facts."""
addr, engine, session = temp_db

# Add test data
async with session() as sess:
sess.add(Facts(data="a\n----\nb\n"))
sess.add(Ideas(data="x\n----\ny\n"))
await sess.commit()

# Run the dump function
await main(addr, engine, session)

# Check output order - ideas should come before facts
captured = capsys.readouterr()
idea_pos = captured.out.find("idea: x => y")
fact_pos = captured.out.find("fact: a => b")
assert idea_pos < fact_pos, "Ideas should be printed before facts"


@pytest.mark.asyncio
async def test_dump_with_simple_fact(temp_db, capsys):
"""Test dumping a fact that starts with dashes."""
addr, engine, session = temp_db

# Add test data - a fact that starts with "--"
async with session() as sess:
sess.add(Facts(data="----\nsimple\n"))
await sess.commit()

# Run the dump function
await main(addr, engine, session)

# Check output - unparse converts "----\nsimple\n" to " => simple"
captured = capsys.readouterr()
assert "fact: => simple" in captured.out
201 changes: 201 additions & 0 deletions tests/test_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import tempfile
import pathlib
from unittest.mock import patch
from io import StringIO
import pytest
import pytest_asyncio
from sqlalchemy import select
from ddss.orm import initialize_database, Facts, Ideas
from ddss.load import main


@pytest_asyncio.fixture
async def temp_db():
"""Fixture to create a temporary database."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = pathlib.Path(tmpdir) / "test.db"
addr = f"sqlite+aiosqlite:///{db_path.as_posix()}"
engine, session = await initialize_database(addr)
yield addr, engine, session
await engine.dispose()


@pytest.mark.asyncio
async def test_load_valid_fact(temp_db):
"""Test that valid input is parsed and stored as a Fact in the database."""
addr, engine, session = temp_db

# Mock stdin with valid input
mock_stdin = StringIO("a => b\n")

with patch("sys.stdin", mock_stdin):
await main(addr, engine, session)

# Verify data was stored
async with session() as sess:
facts = await sess.scalars(select(Facts))
facts_list = list(facts)
assert len(facts_list) == 1
assert facts_list[0].data == "a\n----\nb\n"


@pytest.mark.asyncio
async def test_load_generates_idea(temp_db):
"""Test that input that parses to non-dashed form generates an Idea."""
addr, engine, session = temp_db

# Mock stdin with valid input
mock_stdin = StringIO("a => b\n")

with patch("sys.stdin", mock_stdin):
await main(addr, engine, session)

# Verify both Fact and Idea were stored
async with session() as sess:
facts = await sess.scalars(select(Facts))
facts_list = list(facts)
assert len(facts_list) == 1
assert facts_list[0].data == "a\n----\nb\n"

ideas = await sess.scalars(select(Ideas))
ideas_list = list(ideas)
assert len(ideas_list) == 1
assert ideas_list[0].data == "----\na\n"


@pytest.mark.asyncio
async def test_load_invalid_input_handling(temp_db, capsys):
"""Test that invalid/malformed input is handled gracefully with error message."""
addr, engine, session = temp_db

# Mock stdin with invalid input
mock_stdin = StringIO("=>\n")

with patch("sys.stdin", mock_stdin):
await main(addr, engine, session)

# Check that error was printed to stderr
captured = capsys.readouterr()
assert "error:" in captured.err

# Verify no data was stored
async with session() as sess:
facts = await sess.scalars(select(Facts))
facts_list = list(facts)
assert len(facts_list) == 0


@pytest.mark.asyncio
async def test_load_empty_input_skipped(temp_db):
"""Test that empty lines are skipped."""
addr, engine, session = temp_db

# Mock stdin with empty lines and valid input
mock_stdin = StringIO("\n \na => b\n")

with patch("sys.stdin", mock_stdin):
await main(addr, engine, session)

# Verify only the valid input was stored
async with session() as sess:
facts = await sess.scalars(select(Facts))
facts_list = list(facts)
assert len(facts_list) == 1
assert facts_list[0].data == "a\n----\nb\n"


@pytest.mark.asyncio
async def test_load_multiple_entries(temp_db):
"""Test that multiple valid inputs are stored correctly."""
addr, engine, session = temp_db

# Mock stdin with multiple valid inputs
mock_stdin = StringIO("a => b\nc => d\nsimple\n")

with patch("sys.stdin", mock_stdin):
await main(addr, engine, session)

# Verify all data was stored
async with session() as sess:
facts = await sess.scalars(select(Facts))
facts_list = list(facts)
# Should have 3 facts
assert len(facts_list) == 3
fact_data = [f.data for f in facts_list]
assert "a\n----\nb\n" in fact_data
assert "c\n----\nd\n" in fact_data
assert "----\nsimple\n" in fact_data

# Should have 2 ideas (from "a => b" and "c => d")
ideas = await sess.scalars(select(Ideas))
ideas_list = list(ideas)
assert len(ideas_list) == 2
idea_data = [i.data for i in ideas_list]
assert "----\na\n" in idea_data
assert "----\nc\n" in idea_data


@pytest.mark.asyncio
async def test_load_mixed_valid_and_invalid(temp_db, capsys):
"""Test that valid inputs are stored even when mixed with invalid ones."""
addr, engine, session = temp_db

# Mock stdin with mixed valid and invalid inputs
mock_stdin = StringIO("a => b\n=>\nc => d\n")

with patch("sys.stdin", mock_stdin):
await main(addr, engine, session)

# Check that error was printed for invalid input
captured = capsys.readouterr()
assert "error:" in captured.err

# Verify valid data was stored
async with session() as sess:
facts = await sess.scalars(select(Facts))
facts_list = list(facts)
assert len(facts_list) == 2
fact_data = [f.data for f in facts_list]
assert "a\n----\nb\n" in fact_data
assert "c\n----\nd\n" in fact_data


@pytest.mark.asyncio
async def test_load_empty_stdin(temp_db):
"""Test that loading from empty stdin completes without errors."""
addr, engine, session = temp_db

# Mock empty stdin
mock_stdin = StringIO("")

with patch("sys.stdin", mock_stdin):
await main(addr, engine, session)

# Verify no data was stored
async with session() as sess:
facts = await sess.scalars(select(Facts))
facts_list = list(facts)
assert len(facts_list) == 0


@pytest.mark.asyncio
async def test_load_duplicate_entries(temp_db):
"""Test that duplicate entries are ignored due to unique constraint."""
addr, engine, session = temp_db

# Pre-populate with one entry
async with session() as sess:
sess.add(Facts(data="a\n----\nb\n"))
await sess.commit()

# Try to load the same entry again
mock_stdin = StringIO("a => b\n")

with patch("sys.stdin", mock_stdin):
await main(addr, engine, session)

# Verify only one entry exists (duplicate was ignored)
async with session() as sess:
facts = await sess.scalars(select(Facts))
facts_list = list(facts)
assert len(facts_list) == 1