This project is a simple implementation of database sorting system, which supports external merge sorting of massive data. The current version of the program is complete and has no bugs.
Our project contains four modules.
Module Name | Function |
---|---|
Scan | Generate the input records, save them in the input directory and scan them |
Filter (Not required but still implemented) | Add filter conditions to generated data to select specific data |
In-memory Sort | Quick sort the input records, and write these sorted data groups into memory, then read data from memory in groups, and use loser tree to reorder these groups of data into a large ordered array. which is divided into Run Generation and spill datas to SSD and HDD in batches |
External Merge Sort | From the file generated in the previous step, read the groups of data into memory and use the loser tree to sort, and write the final sorted results back to HDD |
Verify | Verify the output file to ensure the sort is successful and valid |
In order to make full use of the CPU Cache, quicksort is first used to generate cache-size mini runs. Until the memory is exhausted, use the loser tree to merge cache-size runs into memory-size runs and save them into SSD. Here we design a shared buffer based on the Producer-Consumer model so that when cache-size run merging produces sorted results, SSD can consume the sorted data simultaneously by asynchronous IO without interrupting the CPU, which optimizes performance.
Based on the premise that the bandwidth of SSD and HDD are the same and the idea from the paper AlphaSort: A Cache-Sensitive Parallel External Sort, we treat HDD as an extension of SSD, that is the second SSD, and write the SSD and HDD at the same time to double the overall bandwidth of IO.
Since there are memory-size runs on SSD and HDD, the first part of each run is read into memory to merge using a loser tree, and each run is continuously refilled until the merge ends. The total fan-in equals to:
cd ${workspaceFolder}
make
./Test -c 120000000 -s 1000 -o trace0.txt
Where "-c" gives the total number of records, "-s" is the individual record size, and "-o" is the trace of the program run. The size of the input data can be adjusted by changing the command line parameters.
- 1. Quicksort
- 2. Tournament trees [5]
- 3. Replacement selection
- 4. Run size > memory size
- 5. Offset-value coding [5]
- 6. Variable-size records
- 7. Compression
- 8. Prefix truncation
- 9. Minimum count of row & column comparisons [5]
- 10. Cache-size mini runs [5]
- 11. Device-optimized page sizes [5]
- 12. Spilling memory-to-SSD [5]
- 13. Spilling from SSD to disk [5]
- 14. Graceful degradation
- a. into merging [5]
- b. beyond one merge step [5]
- 15. Optimized merge patterns [5]
- 16. Verifying
- a. sets of rows & values [5]
- b. sort order [5]
For a sequence with
The tournament tree is a data structure commonly used in sorting algorithms to quickly find the minimum or maximum value. Its main advantage is the ability to find the smallest or largest element in a faster time and retain information from previous comparisons to efficiently search for subsequent elements. A loser tree is used in this project to maximize the performance of internal and external merges.
Offset-value coding is an encoding method for sort keywords, which reduces comparison overhead in sorting by avoiding full-string comparison of keywords.
The size of the record is controlled by the input.
Additionally, one record has three columns and each column is of type character array to control record size easily:
Field Name | Index | Type | Size |
---|---|---|---|
Incl | 0 | char* | |
Mem | 1 | char* | |
Mgmt | 2 | char* |
The Loser-Tree and Offset-value coding work together to minimize the count of comparisons between record rows and columns. The loser tree minimizes the number of comparisons of data rows to
Modern computers have multiple levels of storage structures: Registers, Cache, RAM, and Hard Disk (SSD, HDD). Among them, the Cache is located between the CPU and RAM. It is faster than RAM but has a smaller capacity. In order to maximize the usage of CPU cache, it is necessary to ensure that all data to be sorted is in the cache, so mini cache size runs (1MB) are generated first at the beginning of sorting.
In this project, SSD and HDD have different latencies and bandwidths. According to the calculation formula of data I/O time:
$$
I/O\ time =
\begin{cases}
latency, & data \leq (latencybandwidth) \
latency\lceil data/(latencybandwidth) \rceil, & data > (latencybandwidth)
\end{cases}
$$
, the optimal device-based I/O page size is calculated (SSD 10KB, HDD 1MB) and applied in the project.
Because the data to be sorted is much larger than the size of RAM, after generating memory-size runs, the data will spill into the SSD, freeing up memory space to other data for internal sorting.
Because the capacity of the SSD is not enough to accommodate all the data, when the SSD capacity is full, the data will spill to the HDD to ensure that all data will be processed.
Graceful degradation means that when an input is just a little too large to be sorted in memory, there is no need to spill the entire input to disk. A better policy is to spill only as much as absolutely necessary so as to make space for extra input records, to minimize the total I/O cost.
In the project, it is possible that the data to be sorted is a little larger than the storage capacity (Cache, RAM, SSD), so graceful degradation is necessary.
Taking the generation of cache size run as an example, the solution is to use a fixed-size circular queue. When encountering data slightly larger than the queue length, the excess data will only cover part of the data at the head of the queue, which means that only a small amount of data spills, and the remaining data remains in the queue, meeting the conditions for graceful degradation.
In addition, taking loser tree merging as an example, when the data to be merged is slightly larger than the memory size, graceful degradation is also implemented using a circular queue. Only the extra data overwrites the memory, while most of the rest of the data remains in memory. In our project, when the in-memory data group corresponding to a leaf node is exhausted, we immediately read the next batch of data from the corresponding group in the disk instead of waiting for all the data in the memory to be consumed. This method takes advantage of graceful degredation.
This project contains multiple merge steps. In each step, we use the circular queue method mentioned above to achieve graceful degradation.
In the verification phase, the order of output records and the consistency of input and output sets are two aspects that need to be checked. The main challenge is how to load data that is much larger than the memory size into memory for verification. The method used here is partition hashing.
- Step 0: Calculate the number of buckets (hash value space) based on the size of the memory and input and output files to ensure that a single bucket can be read into the memory.
- Step 1: Read the output files into memory in batches.
- Step 2: Hash every record in the current batch of data, distribute the records into different buckets based on the hash results, and flush the bucket to the disk at last.
- Step 3: Repeat steps 1 and 2 until the whole output file is processed. Get the hash result (bucket on disk) of the output file.
- Step 4: Do step 1,2,3 for the input file. Get the hash bucket result of the input file.
Load the two bucket files with the same hash value of the input file and the output file into the memory, and determine whether the data on both sides match. Because the hash values are the same, they should correspond to the same block of original data. If there is a data mismatch, it means that the output is inconsistent with the input.
The verification of the order can be done during scanning of the output file. Just use a variable to store the previous record and determine whether the current variable is not less than the previous variable.
Results of 120G:
|------------------------Input Arguments-------------------------|
|Records need generating | 120000000 Records|
|Every record's length | 1000 Bytes|
|Trace file name | 120Mx1KB=120GB.result|
|----------------------------------------------------------------|
|----------------------------------------------------------------|
|Scan & Filter & In-memory sort phase |
||---------------------Generate & Scan Data---------------------||
||Records scanned | 120000000 of 120000000 Records||
||-------------------------Filter Data--------------------------||
||Records filtered | 120000000 of 120000000 Records||
||------------------------In-memory Sort------------------------||
||Records sorted | 120000000 of 120000000 Records||
||--------------------[CPU & Memory Status]---------------------||
||CPU Cache used size | 1 MB||
||DRAM used size | 100 MB||
||------------------------[Disk Status]-------------------------||
||Data written into SSD | 10737420000 Bytes ≈ 10737 MB||
||SSD write latency | 0.10 ms||
||SSD write bandwidth | 100 MB/s||
|| | ||
||Data written into HDD | 109262580000 Bytes ≈ 109262 MB||
||HDD write latency | 10.00 ms||
||HDD write bandwidth | 100 MB/s||
||--------------------------------------------------------------||
|Scan & Filter & In-memory sort phase end |
|Total time cost: 5387089 ms |
|----------------------------------------------------------------|
|----------------------------------------------------------------|
|External merge sort phase |
||-----------------------[Memory Status]------------------------||
||DRAM allocated size | 100 MB||
||----------------------[Read Disk Status]----------------------||
||Data read from SSD | 10737420000 Bytes ≈ 10737 MB||
||SSD read latency | 0.10 ms||
||SSD read bandwidth | 100 MB/s||
|| | ||
||Data read from HDD | 109262580000 Bytes ≈ 109262 MB||
||HDD read latency | 10.00 ms||
||HDD read bandwidth | 100 MB/s||
||---------------------[Write Disk Status]----------------------||
||Data written into HDD | 120000000000 Bytes ≈ 120000 MB||
||HDD write latency | 10.00 ms||
||HDD write bandwidth | 100 MB/s||
||--------------------------------------------------------------||
|External merge sort phase end |
|Total time cost: 4098382 ms |
|----------------------------------------------------------------|
|----------------------------------------------------------------|
|Result Verify phase |
||---------------------[Hash Table Status]----------------------||
||Number of input buckets | 2289||
||Number of output buckets | 2289||
||------------------------Verify Results------------------------||
||Is output file ordered? | True||
||Are records in input match with output? | True||
||--------------------------------------------------------------||
|Result Verify phase end |
|Total time cost: 24777976 ms |
|----------------------------------------------------------------|
Name | Contributions |
---|---|
Kefan Zheng | Tournament trees, Shared Buffer to spill data from memory to disks, Verifying, Readme |
Tianyu Huang | Scan SSD&HDD and do external merge sort, Write data back to HDD, Trace |
Chuan Tian | Offset-value coding, Readme |
Ethan Fang | Base structure, In-memory Scan&Filter&Quick Sort, Whole process debug |