-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark zipnumcluster job (draft) #8
base: main
Are you sure you want to change the base?
Conversation
also note: @sebastian-nagel - I do have a "more perfect sorting" version which uses reservoir sampling in my local history - if we end up needing it, it's already done. |
After thinking longer about it: some types of queries will still work but others don't. The problem starts when the software reading the CDX index assumes that it is totally sorted. This especially applies to any kind of range queries. For example, the query
returns So, for the Basically this is counting the number of lines in the
Because there might be also results in the zipnum block before the first one, 1 is added to the number of lines. If the zipnum blocks are non-contiguous, we'd need to add 1 for every contiguous range of block. Naturally, the result will become less precise. In addition, there's more work to do for larger range queries. That's what the statement "Generally, this overhead [of the zipnum index] is negligible when looking up large indexes, and non-existent when doing a range query across many CDX lines." (https://pywb.readthedocs.io/en/latest/manual/indexing.html#zipnum) On the other end, queries for single URLs might work the same and with the same performance independent from the partitioning scheme.
What does it mean? Total order sorting?
All kind of range queries also need to be tested:
Of course, even then: we'd need to document for our users the new CDX index sorting and spread this information. We do not know which assumptions are made in any third-party software and whether they rely on the total order sorting. This alone might make it less work to implement the total order sorting. |
I mentioned in next sentence fragment - I used same technique as was used in hadoop version - reservoir sampling to produce the ranges, then another pass using those ranges to do the shards. I will find that version in my local history and check it when I work on this next. |
Maybe it's not necessary to do the sampling step - Spark has a sortBy (or sortByKey) method which does a total order sorting with N partitions. We use it to sort the vertices of the host-level webgraph before enumerating them. Same with the reservoir sampling: the partitions are not perfectly balanced, but the balance is acceptable. Note: Spark has also methods to only sort the data within the partitions, they are usually named by "WithinPartitions", see for example repartitionAndSortWithinPartitions. |
Indeed - I was aware of these, but not all of them, and only some of the nuance. I've done some deep reading, and, by my best assessment: my informal definition of "perfect sort" is the last record of 1 partition will be less than the first record of the next partition (so, if I go through the partitions in order, I will never get records out of order.)
I'm leaning towards repartitionAndSortWithinPartitions, using hash of the url - but I may change my mind after running a few jobs and seeing how uneven they are... IMHO, 5-10% variance seems fine, if it's much more than that, it doesn't feel as good (though, that's why I want to read the zipnum code as I state below - it may not have a practical issue... so, could be fine) Since I'm waiting on/monitoring other jobs anyway, I'm going to take another block of time tomorrow to do similar deep read of zipnum code, just so I have much better understanding of that as well. (specifically, I'm going to read the index server's code which USES zipnum, as that's the part that is still murky to me) Thanks again for the input @sebastian-nagel , much appreciated. |
Everyone's expectation is that the cdx index shards and parquet shards surt values should not overlap. For cdx, that's expected by pywb. For parquet, it's important for optimization. We do have a few parquet indexes for which that isn't true, and it's a problem we will fix someday. |
Got it - I think with hash and reservoir sampled approaches I outlined, they should not overlap at the shard level (as the former would not allow it, and the latter would be matching what we already do today pretty exactly). There MAY be overlapping gzip chunks that overlap (very small amounts with reservoir, and potentially rather larger amounts with hash) - but, as long as the secondary index reflects those properly, I don't think it'll be an issue based on my read of the index server side of things. |
I will get back to this task Monday, so, plenty of time to discuss if I'm wrong there... I will bring it up on eng. call, and if we need to talk, we can do it then. |
What Greg means is that there must be zero overlap for all ranges defined by the first and the last SURT in a zipnum block. It's important because the secondary index (cluster.idx) only stores the first SURT but not the last one. But with strict sorting, the last one must be lower (sorts before) than the first one of the next zipnum block. For Parquet zero overlap is an optimization but no requirement: every Parquet file and row group has the min and max values in the statistics in the footer. |
I have updates to this task in another branch - for now, I'm going to preserve the existing approach, reservoir, and get this task finished. I have it all working locally and will be testing tonight/tomorrow in s3 on a full crawl, and will then re-do the PR to reflect (I'll probably merge it into this branch, to keep things simple, and preserve the above history. |
…res (need to refactor ccpyspark back into it now)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some hick-ups if trying to create a zipnum index locally.
Finally, I was able to create a zipnum index by setting
--output_base_url=file:/absolute/path/to/index/
- but this creates a directory tree
file:/absolute/path/to/index/
in the current directory --partition_boundries_file="/absolute/path/to/index/cluster.idx
- please, also fix the typo "boundries" (should be "boundaries")
The created index, both cdx-*.gz
and cluster.idx
, look good:
- all records from the input are in the zipnum index
- no sorting issues found
- offsets in cluster.idx point to valid zipnum blocks
zipnumcluster-ccpyspark.py
Outdated
parser.add_argument("--output_base_url", required=False, | ||
default='my_cdx_bucket', | ||
help="destination for output") | ||
parser.add_argument("--partition_boundries_file", required=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If --partition_boundries_file
is not specified, the job fails:
Traceback (most recent call last):
File ".../webarchive-indexing/zipnumcluster-ccpyspark.py", line 263, in <module>
job.run()
File ".../webarchive-indexing/sparkcc.py", line 187, in run
self.run_job(session)
File ".../webarchive-indexing/zipnumcluster-ccpyspark.py", line 238, in run_job
self.write_output_file(boundries_file_uri, f)
File ".../webarchive-indexing/sparkcc.py", line 840, in write_output_file
uri_match = self.data_url_pattern.match(uri)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: expected string or bytes-like object, got 'NoneType'
Is this a required argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - it's either where the file is written to, or read from (if it already exists)
# loop over the output files and concatenate them into a single final file | ||
with open('cluster.idx', 'wb') as f: | ||
for idx_file,_ in rdd: | ||
with self.fetch_file(output_base_url + idx_file) as idx_fd: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default of --output_base_url
is my_cdx_bucket
(no trailing slash). So, when testing locally, I get the error:
4/12/20 14:03:37 INFO ZipNumClusterCdx: Reading local file my_cdx_bucketidx-00000.idx
Traceback (most recent call last):
File ".../webarchive-indexing/zipnumcluster-ccpyspark.py", line 263, in <module>
job.run()
File ".../webarchive-indexing/sparkcc.py", line 187, in run
self.run_job(session)
File ".../webarchive-indexing/zipnumcluster-ccpyspark.py", line 252, in run_job
with self.fetch_file(output_base_url + idx_file) as idx_fd:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".../webarchive-indexing/sparkcc.py", line 727, in fetch_file
warctemp = open(uri, 'rb')
^^^^^^^^^^^^^^^
FileNotFoundError: [Errno 2] No such file or directory: '.../webarchive-indexing/my_cdx_bucketidx-00000.idx'
Looks like, for testing locally, --output_base_url
needs to be of the form: file:/absolute/path/
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's correct (and is shown in the integration tests/scripts in the cc-redact project)
help="number of partitions/shards") | ||
|
||
def run_job(self, session): | ||
os.makedirs(self.args.output_base_url, exist_ok=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See below: if --output_base_url
is file:/absolute/path/
, on Linux a relative folder file:
is created in the current directory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will double check that - it's not happening for me, but I may have omitted file:, so, may be testing it differently
zipnumcluster-ccpyspark.py
Outdated
rdd = rdd.repartitionAndSortWithinPartitions( | ||
numPartitions=num_partitions, | ||
partitionFunc=lambda k: get_partition_id(k,boundaries), | ||
keyfunc=lambda x: x[0]) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will sort on the first character of the SURT key only. Should use the identity function (just leave the param keyfunc
away).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
humn - OK, I had put that in there because I thought x was a tuple at that point, and [0] was the key - but based on what you're saying, does keyfunc already get passed the first element of the tuple? I will double check the docs, but, it sounds reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, right you are:
this is from spark source:
key=lambda k_v: keyfunc(k_v[0])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a subtle difference between sortByKey and sortBy. And of course, from the documentation it's even less clear how repartitionAndSortWithinPartitions() does behave. Ok, in Scala or Java this would just result in an Exception "Expected list or tuple, got string". With Python this is tricky.
This is a cc-pyspark version of the zipnum clustering job (without the use of mrjob framework)