-
Notifications
You must be signed in to change notification settings - Fork 4
xml files processor optimization
Xml files are small (~100kB per file), but the ingest, transform and dump-to-database is extremely slow.
Two improvements can be done with the code:
Because the
error, using psycopg2
to connect to PostgreSQL
across nodes are not easy. Originally, updating entries in psql
database is done by extracting dataframe information as plain strings and save to dictionary:
def update_patient_info(info, key):
<sql_queries>
info = {'stage': xml_schema.first()['shared_stage:stage_event']['shared_stage:pathologic_stage']._VALUE, \
'primary_site': xml_schema.first()['clin_shared:tumor_tissue_site']._VALUE, \
'gender': xml_schema.first()['shared:gender']._VALUE}
update_patient_info(info, key=f.caseid)
Each extraction is considered as one task, and takes ~1s.
Total time for processing 100 files: 863.7245299816132s
As suggested by this question, map each partition to a psql
interfacing method, and connect to psql
with psycopg2
in each partition:
def update_patient_info(info, key):
from psycopg2 import extras
conn = psycopg2.connect(tokens)
cur = conn.cursor()
for row in rows:
cur.execute(query)
conn.commit()
...
xml_schema_rdd.foreachPartition(test_info)
(Make sure psycopg2
are avaliable on worker nodes)
Total time for processing 100 files: 367.3962197303772
cur.executemany(query, rows)
Total time for processing 100 files: 356.623165845871
from psycopg2 import extras
psycopg2.extras.execute_batch(cur, query, rows)
(The submodule has to be imported to each partition
Total time for processing 100 files: 374.7439913749695
-
Never extract information from DataFrame/RDD
-
batch execution isn't helping much
The reason the batch optimization didn't work above is because the files are passed one by one. So there's only 1 row of RDD sent to database every time.
Since the following is not achievable:
def xml_reader(f):
df.select()
...
def csv_reader(f):
spark.write.format(‘jdbc’)
...
dict = {“XML”: xml_reader, “CSV:”: csv_reader}
files_df.foreachPartition(dict[files_df.filetype])
Because of:
_pickle.PicklingError: Could not serialize object:
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation.
SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
We need other way around.
def xml_reader(f):
df.select()
...
def csv_reader(f):
spark.write.format(‘jdbc’)
...
dict = {“XML”: xml_reader, “CSV:”: csv_reader}
flist = list(df.rdd.collect())
list(map(lambda x: dict[x.filetype](x), flist))
Total time for processing 100 files: (Similar to for-loop, not parallelized at all)
from collections import deque
from multiprocessing import Pool
pool = Pool(processes = 8)
deque(pool.map(lambda x:inner(spark, x), filelist))
Returns:
AttributeError: Can't pickle local object 'process_xml.<locals>.<lambda>'
Extract the filelist as RDD
, and use mapPartitions()
to parse the RDD
s.
Connect to Amazon S3 with boto3
and parse xml files with python built-in xml.etree
(instead of spark.read
and spark-xml
)
Total time for processing 1080 files: 107.45448088645935
Now, since each batch have multiple RDD
s, we can try
from psycopg2 import extras
psycopg2.extras.execute_batch(cur, query, rows)
again.
Total time for processing 1080 files: 88.00022768974304
Processed 100 times faster
Parallel what needs to be parallelized when there's conflict.
Boto3
and psycopg2
as bottleneck.
HDFS and HIVE might be supported better - No more Internet connection required