Skip to content

Commit

Permalink
feat: add core and handlers for lists and db
Browse files Browse the repository at this point in the history
Signed-off-by: Harshal Chaudhari <[email protected]>
  • Loading branch information
harshalchaudhari35 committed Mar 27, 2023
1 parent 5cbb1ef commit b654929
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 0 deletions.
Empty file added yapyfuzz/__init__.py
Empty file.
49 changes: 49 additions & 0 deletions yapyfuzz/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
import tempfile
from shutil import which

from yapyfuzz.handler import ListHandler, SQLiteHandler

# constants
FZF_URL = "https://github.com/junegunn/fzf"


class Fuzzy:
def __init__(
self, handler: ListHandler | SQLiteHandler, exec_path=None
):
self.input_file = None
self.output_file = None
self.handler = handler

if exec_path:
self.exec_path = exec_path
elif not which("fzf") and not exec_path:
raise SystemError(f"Cannot find 'fzf' installed on PATH. ({FZF_URL})")
else:
self.exec_path = "fzf"

def get_selection(self, fzf_options=""):
input = self.handler.get_utf8_str()
selection = []

with tempfile.NamedTemporaryFile(delete=False) as input_file:
with tempfile.NamedTemporaryFile(delete=False) as output_file:
# Create a temp file with list entries as lines
input_file.write(input)
input_file.flush()

# Invoke fzf externally and write to output file
os.system(
f'{self.exec_path} {fzf_options} < "{input_file.name}" > "{output_file.name}"'
)

# get selected options
with open(output_file.name, encoding="utf-8") as f:
for line in f:
selection.append(line.strip("\n"))

os.unlink(input_file.name)
os.unlink(output_file.name)

return selection
57 changes: 57 additions & 0 deletions yapyfuzz/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from abc import ABC
import psycopg2
import pandas as pd
import tempfile

import os


class Handler(ABC):
def reader(self) -> None:
pass

def get_utf8_str(self) -> None:
pass


class ListHandler(Handler):
def reader(self, choices, delimiter="\n") -> str:
# convert a list to a string [ 1, 2, 3 ] => "1\n2\n3"
self.output = delimiter.join(map(str, choices))

def get_utf8_str(self) -> None:
return self.output.encode("utf-8")


class SQLiteHandler(Handler):
def reader(self, name: str, query: str) -> str:
self.output = os.popen(f'sqlite3 {name} "{query}"').read()

def get_utf8_str(self) -> None:
return self.output.encode("utf-8")


class PostgresHandler(Handler):
def reader(self, query: str, **kwargs) -> str:
with psycopg2.connect(
**kwargs,
) as connection:
connection.autocommit = True

self.df = pd.read_sql(query, con=connection)

def get_utf8_str(self) -> None:
self.df_string = []
with tempfile.NamedTemporaryFile(mode="r+", delete=False) as output_file:
# Create a temp file with list entries as lines
self.df.to_csv(output_file, index=False)

with open(output_file.name, encoding="utf-8") as f:
for line in f:
self.df_string.append(line)

self.df_string = "".join(map(str, self.df_string))
os.unlink(output_file.name)

return self.df_string.encode("utf-8")

0 comments on commit b654929

Please sign in to comment.