Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large queries with WMArchive #352

Open
llayer opened this issue Mar 5, 2019 · 6 comments
Open

Large queries with WMArchive #352

llayer opened this issue Mar 5, 2019 · 6 comments

Comments

@llayer
Copy link

llayer commented Mar 5, 2019

Dear @vkuznet ,

for my project on AI error-logs with @vlimant I now need to locate a large number of
log files, i.e. I have ~25.000 workflows (https://vocms0113.cern.ch/actionshistory) and for each
of them I need to locate the logs.
I managed to write for each of the workflows a spec file that contains the task name and the time-
range (with the exact day when it was run), e.g.

{"spec":{"task":"/vlimant_ACDC0_task_HIG-RunIIFall17wmLHEGS-01415__v1_T_180706_002124_986/HIG-RunIIFall17DRPremix-02001_1/HIG-RunIIFall17DRPremix-02001_1MergeAODSIMoutput/HIG-RunIIFall17MiniAODv2-01299_0", "timerange":[20180706,20180706]}, "fields":[]}

With this I am able to locate the unmerged jobs.
However, for one task this runs for around ~10 seconds, so if I create 25.000 spec files and run
a query for each of it it will take a lot of time. So I would like to know what might be the most
efficient way to locate that many files. Can I somehow make one large query that efficiently
returns all the files that I need, or give a finer time-range to speed up the query?
It would be great to know what possibilities exist for this problem.

Many, many thanks for your help!

@vkuznet
Copy link
Contributor

vkuznet commented Mar 5, 2019

Lukas,
I think you should approach this kind of problem differently. Instead of firing N jobs with individual tasks and time-ranges you can do the reverse, i.e. run a job over large time range, extract workflows and then look-up their log archives. Here I assume that you don't really care about specific tasks and rather will get whatever your job will scan in specified time range.

First, you need to clearly understand the procedure which is outlined here.
Then, you probably need to adjust (or write new one) LogFinder.py to adopt to your conditions. Current code is throw away all records which does not match conditions, but if you will do a full scan you'll collect those and yield. In your case you may pass a list of workflows to spark job and keep records which will match the list. I think you'll need to put these 25k workflows on HDFS such that you can create a DataFrame and then you'll join this DataFrame with WMArchive records to extract subset of later to proceed.

@llayer
Copy link
Author

llayer commented Mar 6, 2019

Hi @vkuznet,

Thank you very much for your suggestion. I looked into the code that you indicated but
I am not so familiar with pyspark, so I would like to make sure that I understand your suggestion
correctly before I go on. As an exercise, I try to locate the records for 25.000 workflows
(like with the RecordFinder script) since this is a bit easier than locating the logs.

So if I understand correctly in myspark.py the locating of records is done with the filter function,

mro = obj.MapReduce(spec)
records = avro_rdd.filter(mro.mapper).collect()

where avro_rdd is a RDD that contains the possible records on hdfs (depening on the time range).
Then you filter these records with a function that implements your specific needs, e.g. match
the 'lsf' key of the spec file. The MapReduce class with this mapper function
is implemented by the scripts like LogFinder, RecordFinder, etc.

So what will be different for me is, that I will have instead of only one workflow in a spec file
multiple workflows that I will load from hdfs into a second RDD or spark DataFrame.
So then I have two RDDs and with these two RDDs I can apply a function
(like join that you mentioned) that will
return me a subset of the avro_rdd records and my 25k workflows, so I can for example
extract the subset that have a matching 'task' and then store the information of the this subset
that I need.

So do I understand this correctly?

Many, many thanks again!!

@vkuznet
Copy link
Contributor

vkuznet commented Mar 6, 2019 via email

@llayer
Copy link
Author

llayer commented Mar 12, 2019

Hi Valentin,

I have a first version of the script that does the task mentioned above.
I commited a first experimental version to my github, maybe you can have a look,
at the moment I just used your myspark.py script to make it somehow run
(the paths are still hardcoded). In the function 'runActionsHistoryQuery':

https://github.com/llayer/WMArchive/blob/master/src/python/WMArchive/Tools/myspark.py

I shortly describe what I did:

First step: locate the logArch tarballs

  1. copy list of failing task names to hdfs and create new rdd
  2. filter the avro_rdd data for failing jobs
  3. join the filtered frame and the failing tasks with the key 'task'

Second step: locate the logCollect tarballs

  1. filter the avro_rdd data for logCollect tasks
  2. use flat_map to make key-value pairs with logArch tarballs as key
  3. join with the frame from the first step using logArch tarballs as key,
    so that for every logArch tarball there is an associated logCollect
  4. write results to hdfs

I managed to run the code over larger time ranges (~3 months) in about 4-5h,
the join functions cost quite some time.

There are two small things:

  1. For some dates for the wmarchive there don't exist paths on the hdfs, so the code crashes.
    I added a check to make sure that the paths exist.

  2. At some point I get the problem with the schema that is not compatible with the old
    tasks. I will now check at which point this happens.

Many, many thanks for all of your help.
Best,
Lukas

@llayer
Copy link
Author

llayer commented Mar 13, 2019

Edit: the problem with the schema can be fixed by loading the old schema from the path
hdfs:///cms/wmarchive/avro/schemas/current.avsc.[date_of_change]

@vkuznet
Copy link
Contributor

vkuznet commented Mar 13, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants