-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_main.py
160 lines (114 loc) · 5.18 KB
/
test_main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import unittest
import pandas as pd
from pandas.io import sql
import datetime
from pathlib import Path
from sqlalchemy import MetaData, Table, Column, String, Integer, DateTime
from sqlalchemy import create_engine
from common.utils import restore_files, dateCETstr_to_tzdt
from dbconfig import (
test_db,
test_directories
)
from datasources.meff.meff_operations import (
update_closing_prices_day,
update_closing_prices_month
)
from datasources.omie.omie_operations import (
get_historical_hour_price,
update_latest_hour_price,
get_historical_energy_buy,
update_energy_buy,
update_historical_hour_price
)
from pipelines.energy_budget import (
pipe_hourly_energy_budget
)
from pipelines.omie_garantia import (
pipe_omie_garantia
)
import datetime
class MainIntegrationTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.engine = create_engine(test_db['dbapi'])
def setUp(self):
# TODO use session and rollback
pass
def tearDown(self):
sql.execute('DROP TABLE IF EXISTS omie_energy_buy', self.engine)
sql.execute('DROP TABLE IF EXISTS omie_price_hour', self.engine)
sql.execute('DROP TABLE IF EXISTS meff_precios_cierre_dia', self.engine)
sql.execute('DROP TABLE IF EXISTS energy_buy_forecast', self.engine)
# metadata = MetaData(bind=self.engine)
# # all tables defined will be dropped
# omie_energy_buy = Table(
# 'omie_energy_buy',
# metadata, autoload=True,
# autoload_with=self.engine
# )
# # omie_price_hour = Table(
# # 'omie_price_hour',
# # metadata, autoload=True,
# # autoload_with=self.engine
# # )
# metadata.drop_all()
def create_historical(self, reading_date: str):
metadata = MetaData(bind=self.engine)
# all tables defined will be dropped
omie_price_hour = Table(
'omie_price_hour',
metadata,
Column('date', DateTime(timezone=True)),
Column('price', Integer),
Column('request_time',DateTime(timezone=True))
)
omie_price_hour.create()
with self.engine.connect() as conn:
ins = omie_price_hour.insert().values(
date=dateCETstr_to_tzdt(reading_date),
price=100,
request_time=dateCETstr_to_tzdt(reading_date))
conn.execute(ins)
def create_datasources_tables(self):
meff_df = pd.read_csv('testdata/inputdata/meff_precios_cierre_dia.csv', parse_dates=['dia','request_time'])
meff_df['dia'] = meff_df['dia'].dt.date
meff_df.to_sql('meff_precios_cierre_dia', con=self.engine, if_exists='replace', index=False)
energy_buy_forecast_df = pd.read_csv('testdata/inputdata/energy_buy_forecast.csv', parse_dates=['date','request_time'])
energy_buy_forecast_df.to_sql('energy_buy_forecast', con=self.engine, if_exists='replace', index=False)
omie_energy_buy_df = pd.read_csv('testdata/inputdata/omie_energy_buy.csv', parse_dates=['date','request_time'])
omie_energy_buy_df.to_sql('omie_energy_buy', con=self.engine, if_exists='replace', index=False)
omie_price_hour_df = pd.read_csv('testdata/inputdata/omie_price_hour.csv', parse_dates=['date','request_time'])
omie_price_hour_df['date'] = pd.to_datetime(omie_price_hour_df['date'], utc=True).dt.tz_convert('Europe/Madrid')
omie_price_hour_df.to_sql('omie_price_hour', con=self.engine, if_exists='replace', index=False)
def test__get_historical_energy_buy(self):
omie_dir = Path(test_directories['OMIE_HISTORICAL_PDBC']).resolve()
result = get_historical_energy_buy(self.engine, omie_dir, verbose=2)
self.assertEqual(result, 0)
def test__update_energy_buy(self):
omie_dir = Path(test_directories['OMIE_TEMP_PDBC']).resolve()
result = update_energy_buy(self.engine, omie_dir, verbose=2)
# TODO move this to tearDown maybe? or catch finally ?
restore_files(omie_dir)
self.assertEqual(result, 0)
def test__pipe_hourly_energy_budget(self):
self.create_datasources_tables()
result = pipe_hourly_energy_budget(self.engine)
self.assertEqual(result, 0)
def _test__pipe_omie_garantia(self):
# TODO: Add dummy tables to create_datasources_tables()
#self.create_datasources_tables()
result = pipe_omie_garantia(self.engine)
self.assertEqual(result, 0)
# TODO mock web call so we can always run this tests
@unittest.skipIf(True, 'Test requires web access to omie. Also needs fixing :P')
def test__update_historical_hour_price(self):
self.create_historical('20220105')
result = update_historical_hour_price(self.engine, verbose=3)
self.assertEqual(result, 0)
# TODO mock web call so we can always run this tests
@unittest.skipIf(True, 'Test requires web access to omie. Also needs fixing :P')
def test__update_historical_hour_price__already_inserted(self):
self.create_historical('22220202')
result = update_historical_hour_price(self.engine, verbose=3)
self.assertEqual(result, 1)