This repository has been archived by the owner on Sep 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
process.py
118 lines (99 loc) · 3.5 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import functions
import matplotlib.pyplot as plt
from datetime import date, timedelta
now = date.today()
def get_spark_session(env, appName):
return SparkSession. \
builder. \
master(env). \
appName(appName). \
config('spark.jars.packages', 'org.xerial:sqlite-jdbc:3.41.2.1'). \
getOrCreate()
spark = get_spark_session('local', 'demo spark')
spark.sql('SELECT current_date').show()
sql_c = SQLContext(spark.sparkContext)
def extract_db(uri):
try:
return sql_c.read.format('jdbc') \
.options(driver='org.sqlite.JDBC', dbtable='status',
url=uri)\
.load()
except:
print("cannot load db at {}".format(uri))
return None
data_dict = {}
for k in range(2, 9):
currentDate = now - timedelta(days=k)
dburi = f'jdbc:sqlite:{currentDate}-data.db'
print('extracting {} with day={} db-uri={}'.format(currentDate, currentDate.day, dburi))
i = currentDate.day
data_dict[i] = extract_db(dburi)
for k in range(2, 9):
currentDate = now - timedelta(days=k)
i = currentDate.day
data_dict[i].createOrReplaceTempView(f"status_data_{i}")
query = "CREATE OR REPLACE TEMPORARY VIEW status_data as ("
for k in range(2, 9):
currentDate = now - timedelta(days=k)
i = currentDate.day
query += f" SELECT * FROM status_data_{i}"
if k < 8:
query += " UNION "
query += ")"
sql_c.sql(query)
for k in range(2, 9):
currentDate = now - timedelta(days=k)
i = currentDate.day
dburi = f'jdbc:sqlite:{currentDate}-data.db'
data_dict[i] = sql_c.read.format('jdbc') \
.options(driver='org.sqlite.JDBC', dbtable='statusConso',
url=dburi)\
.option("customSchema", "date STRING")\
.load()
for k in range(2, 9):
currentDate = now - timedelta(days=k)
i = currentDate.day
data_dict[i].withColumn('day', functions.dayofmonth('date'))\
.withColumn('month', functions.month('date'))\
.withColumn('year', functions.year('date'))\
.withColumn('hour', functions.hour('date'))\
.withColumn('minute', functions.minute('date'))\
.createOrReplaceTempView(f"status_conso_data_{i}")
query = "CREATE OR REPLACE TEMPORARY VIEW status_conso_data as ("
for k in range(2, 9):
currentDate = now - timedelta(days=k)
i = currentDate.day
query += f" SELECT * FROM status_conso_data_{i}"
if k < 8:
query += " UNION "
query += ")"
sql_c.sql(query)
query = """CREATE OR REPLACE TEMPORARY VIEW final_status_data as (
SELECT status_conso_data.* FROM status_data LEFT JOIN status_conso_data on status_conso_data.id = status_data.idConso
where status_data.code = '10107'
)"""
sql_c.sql(query)
query = """select id,
day, hour, minute,
nbBike as bikes,
nbEbike as ebikes,
nbEDock as slots,
((nbBike + nbEbike) / nbEDock) as availability
from final_status_data
where minute = '0'
order by day, hour asc
"""
df = sql_c.sql(query) \
.toPandas()
plt.figure(figsize=(14,9))
days=['Lundi', 'Mardi', 'Mercredi', 'Jeudi', 'Vendredi', 'Samedi', 'Dimanche']
for k in range(2, 9):
currentDate = now - timedelta(days=k)
print(currentDate)
plt.plot(df.loc[df['day'] == currentDate.day]['hour'].to_numpy(), df.loc[df['day'] == currentDate.day]['availability'].to_numpy(), label=days[currentDate.weekday()])
plt.xlabel("Heures")
plt.ylabel("Disponibilité (%)")
plt.legend()
plt.savefig('export.png')