-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Large queries with WMArchive #352
Comments
Lukas, First, you need to clearly understand the procedure which is outlined here. |
Hi @vkuznet, Thank you very much for your suggestion. I looked into the code that you indicated but So if I understand correctly in myspark.py the locating of records is done with the filter function,
where avro_rdd is a RDD that contains the possible records on hdfs (depening on the time range). So what will be different for me is, that I will have instead of only one workflow in a spec file So do I understand this correctly? Many, many thanks again!! |
Lukas,
yes, your observation is correct, but you should be *very* careful with collect
call. Think of it that a job on a worker node will need to load into its RAM
all stuff you want to collect. Therefore, if you'll collect 25k JSON docs
I doubt it will succeed.
Instead, you need to write back your found records to HDFS. For example
https://github.com/dmwm/CMSSpark/blob/master/src/python/CMSSpark/dbs_events.py#L75
shows how I write back data to HDFS.
The rest I think you got correctly, get your data, join with your workflows,
apply filters, then write back to HDFS. Then you can get these data from HDFS
by copying them through hadoop -fs get hdfs:///path ./local/path
command.
Best,
Valentin.
…On 0, llayer ***@***.***> wrote:
Hi @vkuznet,
Thank you very much for your suggestion. I looked into the code that you indicated but
I am not so familiar with pyspark, so I would like to make sure that I understand your suggestion
correctly before I go on. As an exercise, I try to locate the records for 25.000 workflows
(like with the RecordFinder script) since this is a bit easier than locating the logs.
So if I understand correctly in [myspark.py](https://github.com/dmwm/WMArchive/blob/master/src/python/WMArchive/Tools/myspark.py) the locating of records is done with the filter function,
```
mro = obj.MapReduce(spec)
records = avro_rdd.filter(mro.mapper).collect()
```
where avro_rdd is a RDD that contains the possible records on hdfs (depening on the time range).
Then you filter these records with a function that implements your specific needs, e.g. match
the 'lsf' key of the spec file. The MapReduce class with this mapper function
is implemented by the scripts like LogFinder, RecordFinder, etc.
So what will be different for me is, that I will have instead of only one workflow in a spec file
multiple workflows that I will load from hdfs into a second RDD or spark DataFrame.
So then I have two RDDs and with these two RDDs I can apply another function that will
return me a subset of the avro_rdd records and my 25k workflows, so I can for example
extract the subset that have a matching 'task' and then store the information of the this subset
that I need.
So do I understand this correctly?
Many, many thanks again!!
--
You are receiving this because you were mentioned.
Reply to this email directly or view it on GitHub:
#352 (comment)
|
Hi Valentin, I have a first version of the script that does the task mentioned above. https://github.com/llayer/WMArchive/blob/master/src/python/WMArchive/Tools/myspark.py I shortly describe what I did: First step: locate the logArch tarballs
Second step: locate the logCollect tarballs
I managed to run the code over larger time ranges (~3 months) in about 4-5h, There are two small things:
Many, many thanks for all of your help. |
Edit: the problem with the schema can be fixed by loading the old schema from the path |
Lukas,
it is nice that you put code together, but I would advise you not to modify
myspark.py since by design it allows to work with user based classes.
Instead, take your code
https://github.com/llayer/WMArchive/blob/master/src/python/WMArchive/Tools/myspark.py#L428-L511
and put it into new file, store it in PySpark area.
Then create your class and try to load it into myspark code.
See how it is done now on line
https://github.com/llayer/WMArchive/blob/master/src/python/WMArchive/Tools/myspark.py#L304
So if you'll pass your script (your code) myspark will try to load it.
Then you can adjust logic in myspark to run your class. Moreover the existing
run function in myspark allows to pass specs such that you can pass to your code
all hardcoded values you have now.
Doing this way you'll not duplicate existing code (as you do now), will allow
to pass external parameters and we can merge your changes.
Regarding your workflow, it seems reasonable to me.
I'm not surprised that code runs long enough since we have lots of data in
WMArchive.
You should keep in mind that gaps are allowed since we may have gaps in
data-operations too. The WMArchvie accumulate records (~50K) per file and file
creation may span several days if data-operations are at low rate.
As you found we also have different schemas and old records should be read with
older schema.
Best,
Valentin.
…On 0, llayer ***@***.***> wrote:
Hi Valentin,
I have a first version of the script that does the task mentioned above.
I commited a first experimental version to my github, maybe you can have a look,
at the moment I just used your myspark.py script to make it somehow run
(the paths are still hardcoded). In the function 'runActionsHistoryQuery':
https://github.com/llayer/WMArchive/blob/master/src/python/WMArchive/Tools/myspark.py
I shortly describe what I did:
First step: locate the logArch tarballs
1. copy list of failing task names to hdfs and create new rdd
2. filter the avro_rdd data for failing jobs
3. join the filtered frame and the failing tasks with the key 'task'
Second step: locate the logCollect tarballs
1. filter the avro_rdd data for logCollect tasks
2. use flat_map to make key-value pairs with logArch tarballs as key
3. join with the frame from the first step using logArch tarballs as key,
so that for every logArch tarball there is an associated logCollect
4. write results to hdfs
I managed to run the code over larger time ranges (~3 months) in about 4-5h,
the join functions cost quite some time.
There are two small things:
1. For some dates for the wmarchive there don't exist paths on the hdfs, so the code crashes.
I added a check to make sure that the paths exist.
2. At some point I get the problem with the schema that is not compatible with the old
tasks. I will now check at which point this happens.
Many, many thanks for all of your help.
Best,
Lukas
--
You are receiving this because you were mentioned.
Reply to this email directly or view it on GitHub:
#352 (comment)
|
Dear @vkuznet ,
for my project on AI error-logs with @vlimant I now need to locate a large number of
log files, i.e. I have ~25.000 workflows (https://vocms0113.cern.ch/actionshistory) and for each
of them I need to locate the logs.
I managed to write for each of the workflows a spec file that contains the task name and the time-
range (with the exact day when it was run), e.g.
{"spec":{"task":"/vlimant_ACDC0_task_HIG-RunIIFall17wmLHEGS-01415__v1_T_180706_002124_986/HIG-RunIIFall17DRPremix-02001_1/HIG-RunIIFall17DRPremix-02001_1MergeAODSIMoutput/HIG-RunIIFall17MiniAODv2-01299_0", "timerange":[20180706,20180706]}, "fields":[]}
With this I am able to locate the unmerged jobs.
However, for one task this runs for around ~10 seconds, so if I create 25.000 spec files and run
a query for each of it it will take a lot of time. So I would like to know what might be the most
efficient way to locate that many files. Can I somehow make one large query that efficiently
returns all the files that I need, or give a finer time-range to speed up the query?
It would be great to know what possibilities exist for this problem.
Many, many thanks for your help!
The text was updated successfully, but these errors were encountered: