Skip to content

Commit

Permalink
Added a non-asyn dbretriever for querying sql databases.
Browse files Browse the repository at this point in the history
  • Loading branch information
disterics committed May 23, 2012
1 parent 2015ecd commit 33670f3
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 0 deletions.
Empty file.
102 changes: 102 additions & 0 deletions leftronicd/contrib/database/databaseRetriever.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from sqlalchemy import create_engine, select, MetaData, Table, func
import json

__lastauthor__ = '[email protected]'
__company__ = 'Leftronic'


class DatabaseRetriever:
DB_TYPE_TO_PREFIX_MAP = {
'mysql' : 'mysql://%s:%s@%s/%s',
'postgresql' : 'postgresql://',
'oracle' : 'oracle://',
'mssql' : 'mssql+pyodbc://%s:%s@%s/%s',
'sqlite' : 'sqlite:///%s',
}
def __init__(self, stream):
self.stream = stream
self.url = self.getDBUrl()
self.engine = None
if self.url:
self.engine = create_engine(self.url, echo=True)

def getConnectionObject(self):
'''Based on the params provided return a database connection object.'''
conn = self.engine.connect()
return conn

def getDBUrl(self):
'''Generate a url from the auth params'''
method = getattr(self, 'get%sUrl' % (self.stream.configs['engine'].capitalize()))
url = method()
return url

def getColumns(self, table):
columns = []
for column in self.stream.configs['columns']:
columns.append(table.c[column])
return columns

def getTable(self):
meta = MetaData()
table = Table(self.stream.configs['table'], meta, autoload=True, autoload_with=self.engine)
return table

def execute(self):
result = []
table = None
if self.engine:
conn = self.getConnectionObject()
table = self.getTable()
columns = self.getColumns(table)
s = select(columns)
if 'limit' in self.stream.configs:
limit = self.stream.configs['limit']
s.limit(limit)
if 'window' in self.stream.configs and 'last' == self.stream.configs['window']:
countStmt = select([func.count(columns[0])])
count = conn.execute(countStmt).scalar()
s.offset(count-limit)
# print("Count is %d" % (count))
if 'asc' in self.stream.configs:
s.order_by(table.c[self.stream.configs['asc']].asc())
elif 'desc' in self.stream.configs:
s.order_by(table.c[self.stream.configs['desc']].desc())
resultset = conn.execute(s)
for row in resultset:
result.append(list(row))
# print("Result: %s" % (result))
return result

def getSqliteUrl(self):
url = self.DB_TYPE_TO_PREFIX_MAP[self.stream.configs['engine']] % (self.stream.configs['database'])
return url

def getMysqlUrl(self):
if 'username' not in self.stream.configs:
return None
if 'password' not in self.stream.configs:
return None
if 'host' not in self.stream.configs:
return None
if 'database' not in self.stream.configs:
return None
host = self.stream.configs['host']
if 'port' in self.stream.configs:
host += ':' + str(self.stream.configs['port'])
url = self.DB_TYPE_TO_PREFIX_MAP[self.stream.configs['engine']] % (self.stream.configs['username'], self.stream.configs['password'], host, self.stream.configs['database'])
return url

def getMssqlUrl(self):
url = None
if 'dsn' in self.stream.configs:
if 'username' in self.stream.configs and 'password' in self.stream.configs:
url = "mssql+pyodbc://%s:%s@%s" %(self.stream.configs['username'], self.stream.configs['password'], self.stream.configs['dsn'])
else:
url = "mssql+pyodbc://%s" % (self.stream.configs['dsn'])
elif 'username' in self.stream.configs and 'password' in self.stream.configs and 'host' in self.stream.configs and 'database' in self.stream.configs:
host = self.stream.configs['host']
if 'port' in self.stream.configs:
host += ':' + str(self.stream.configs['port'])
url = self.DB_TYPE_TO_PREFIX_MAP[self.stream.configs['engine']] % (self.stream.configs['username'], self.stream.configs['password'], host, self.stream.configs['database'])
return url
30 changes: 30 additions & 0 deletions leftronicd/contrib/dbretriever.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from twisted.python import log
from twisted.internet import defer
from database.databaseRetriever import DatabaseRetriever

class Stream(object):
pass

def get_page(handler, *args, **kwargs):
deferred = Deferred()
def callback(data):
output = h
deferred.callback(output)
return deferred

# def execute_db_retriever(engine, database, table, columns, **kwargs):
def execute_db_retriever(engine, database, **kwargs):
log.msg("Get db data is called with %s:%s" % (engine, database))
def callback():
headerRow = ['name', 'city', 'country']
dataRows = [ ['Lionel', 'Rosario', 'Argentina'], ['Andres', 'Albacete', 'Spain']]
stream = Stream()
stream.configs = dict(kwargs)
stream.configs['engine'] = engine
stream.configs['database'] = database
db = DatabaseRetriever(stream)
dataRows = db.execute()
result = { 'headerRow' : stream.configs['columns'], 'dataRows' : dataRows }
return result
return defer.execute(callback)

21 changes: 21 additions & 0 deletions mssql_sample.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
accesskey: <leftronic_access_key>
streams:
- method: leftronicd.contrib.dbretriever.execute_db_retriever
verbosename: DB Retriever
name: <stream_id>
type: <widget_type> # should be table
interval: <number in seconds between updates>
engine: mssql
dsn: <dsn
host: <database host> # do not specify if dsn is specified
username: <database username>
password: <database password>
port: <database connection port> # do not specify if dsn is specified
database: <database name> # do not specify if dsn is specified
table: <database table to read>
limit: <number of rows>
asc: <column name> # will take precedence if both asc and desc specified
columns:
- <column 1>
- <column 2>
- <column 3>
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
'requests==0.11.1',
'wsgiref==0.1.2',
'zope.interface==3.8.0',
'MySQL-python==1.2.3',
'SQLAlchemy==0.7.7',
'pyodbc==3.0.3',
]

try:
Expand Down

0 comments on commit 33670f3

Please sign in to comment.