Skip to content

Commit

Permalink
Review changes
Browse files Browse the repository at this point in the history
Signed-off-by: Sayed Bilal Bari <[email protected]>
  • Loading branch information
bilalbari committed Jun 13, 2024
1 parent d7e9df4 commit 19e8c01
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 51 deletions.
42 changes: 24 additions & 18 deletions nds-h/nds_h_gen_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
'supplier'
]


def generate_data_local(args, range_start, range_end, tool_path):
"""Generate data to local file system. TPC-DS tool will generate all table data under target
folder without creating sub-folders for each table. So we add extra code to create sub folder
Expand Down Expand Up @@ -88,10 +89,10 @@ def generate_data_local(args, range_start, range_end, tool_path):
procs = []
for i in range(range_start, range_end + 1):
dbgen = ["-s", args.scale,
"-C", args.parallel,
"-S", str(i),
"-v", "Y",
"-f","Y"]
"-C", args.parallel,
"-S", str(i),
"-v", "Y",
"-f", "Y"]
procs.append(subprocess.Popen(
["./dbgen"] + dbgen, cwd=str(work_dir)))
# wait for data generation to complete
Expand All @@ -105,23 +106,25 @@ def generate_data_local(args, range_start, range_end, tool_path):
for table in table_names:
print('mkdir -p {}/{}'.format(data_dir, table))
subprocess.run(['mkdir', '-p', data_dir + '/' + table])
if (table != 'region' and table !='nation'):
if (table != 'region' and table != 'nation'):
for i in range(range_start, range_end + 1):
subprocess.run(['mv', f'{work_dir}/{table}.tbl.{i}',
f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL)
else:
subprocess.run(['mv', f'{work_dir}/{table}.tbl',
f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL)
f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL)
# delete date file has no parallel number suffix in the file name, move separately
# show summary
subprocess.run(['du', '-h', '-d1', data_dir])


def clean_temp_data(temp_data_path):
cmd = ['hadoop', 'fs', '-rm', '-r', '-skipTrash', temp_data_path]
print(" ".join(cmd))
subprocess.run(cmd)

def merge_temp_tables(temp_data_path, parent_data_path, update):

def merge_temp_tables(temp_data_path, parent_data_path):
"""Helper functions for incremental data generation. Move data in temporary child range path to
parent directory.
Expand All @@ -146,6 +149,7 @@ def merge_temp_tables(temp_data_path, parent_data_path, update):
subprocess.run(cmd)
clean_temp_data(temp_data_path)


def generate_data_hdfs(args, jar_path):
"""generate data to hdfs using TPC-DS dsdgen tool. Support incremental generation: due to the
limit of hdfs, each range data will be generated under a temporary folder then move to target
Expand All @@ -163,7 +167,7 @@ def generate_data_hdfs(args, jar_path):
raise Exception('No Hadoop binary found in current environment, ' +
'please install Hadoop for data generation in cluster.')
# Submit hadoop MR job to generate data
cmd = ['hadoop', 'jar', str(jar_path)]
cmd = ['hadoop', 'jar', str(jar_path)]
cmd += ['-p', args.parallel, '-s', args.scale]
# get dsdgen.jar path, assume user won't change file structure
tpcds_gen_path = jar_path.parent.parent.absolute()
Expand All @@ -182,14 +186,15 @@ def generate_data_hdfs(args, jar_path):
try:
subprocess.run(cmd, check=True, cwd=str(tpcds_gen_path))
# only move delete table for data maintenance
merge_temp_tables(temp_data_path, args.data_dir, args.update)
merge_temp_tables(temp_data_path, args.data_dir)
finally:
clean_temp_data(temp_data_path)
else:
cmd.extend(["-d", args.data_dir])
subprocess.run(cmd, check=True, cwd=str(tpcds_gen_path))
# only move delete table for data maintenance



def generate_data(args):
jar_path, tool_path = check_build_nds_h()
range_start = 1
Expand All @@ -201,29 +206,30 @@ def generate_data(args):
else:
generate_data_local(args, range_start, range_end, tool_path)


if __name__ == "__main__":
parser = parser = argparse.ArgumentParser()
parser.add_argument("type",
choices=["local", "hdfs"] ,
choices=["local", "hdfs"],
help="file system to save the generated data.")
parser.add_argument("scale",
help="volume of data to generate in GB. Accepted SF - 1,10, 100, 300, 1000 \
,3000, 10000, 30000,"
)
)
parser.add_argument("parallel",
type=parallel_value_type,
help="build data in <parallel_value> separate chunks"
)
)
parser.add_argument("data_dir",
help="generate data in directory.")
parser.add_argument('--range',
help='Used for incremental data generation, meaning which part of child' +
'chunks are generated in one run. Format: "start,end", both are inclusive. ' +
'e.g. "1,100". Note: the child range must be within the "parallel", ' +
'"--parallel 100 --range 100,200" is illegal.')
'chunks are generated in one run. Format: "start,end", both are inclusive. ' +
'e.g. "1,100". Note: the child range must be within the "parallel", ' +
'"--parallel 100 --range 100,200" is illegal.')
parser.add_argument("--overwrite_output",
action="store_true",
help="overwrite if there has already existing data in the path provided.")

args = parser.parse_args()
generate_data(args)
generate_data(args)
2 changes: 1 addition & 1 deletion nds-h/tpch-gen/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
62 changes: 30 additions & 32 deletions nds-h/tpch-gen/src/main/java/org/nvidia/nds_h/GenTable.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -16,29 +16,26 @@
*/

package org.nvidia.nds_h;

import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.*;

import org.apache.hadoop.filecache.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;

import org.apache.commons.cli.*;
import org.apache.commons.*;

import java.io.*;
import java.nio.*;
import java.util.*;

import java.net.*;
import java.math.*;
import java.security.*;
import java.util.Objects;


public class GenTable extends Configured implements Tool {
Expand All @@ -54,21 +51,7 @@ public int run(String[] args) throws Exception {

CommandLineParser parser = new BasicParser();
getConf().setInt("io.sort.mb", 4);
org.apache.commons.cli.Options options = new org.apache.commons.cli.Options();
/*
* These are the various options being passed to the class
* -s scale
* -d output directory
* -t specific table data
* -p nunber of parallel parts
* -o overwrite output directory if exists
*/
options.addOption("s","scale", true, "scale");
options.addOption("d","dir", true, "dir");
options.addOption("t","table", true, "table");
options.addOption("p", "parallel", true, "parallel");
options.addOption("o", "overwrite", false, "overwrite existing data");
options.addOption("r", "range", true, "child range in one data generation run");
Options options = getOptions();
CommandLine line = parser.parse(options, remainingArgs);

if(!line.hasOption("scale")) {
Expand Down Expand Up @@ -166,7 +149,7 @@ public int run(String[] args) throws Exception {
MultipleOutputs.addNamedOutput(job, "text",
TextOutputFormat.class, LongWritable.class, Text.class);

boolean success = job.waitForCompletion(true);
job.waitForCompletion(true);

// cleanup
fs.delete(in, false);
Expand All @@ -175,6 +158,25 @@ public int run(String[] args) throws Exception {
return 0;
}

private static Options getOptions() {
Options options = new Options();
/*
* These are the various options being passed to the class
* -s scale
* -d output directory
* -t specific table data
* -p number of parallel files to be generated
* -o overwrite output directory if exists
*/
options.addOption("s","scale", true, "scale");
options.addOption("d","dir", true, "dir");
options.addOption("t","table", true, "table");
options.addOption("p", "parallel", true, "parallel");
options.addOption("o", "overwrite", false, "overwrite existing data");
options.addOption("r", "range", true, "child range in one data generation run");
return options;
}

/*
* This function just copies the jar from the local to hdfs temp
* location for access by the mappers
Expand Down Expand Up @@ -229,7 +231,7 @@ public Path genInput(String table, int scale, int parallel, int rangeStart, int
}
else{
// TODO - update using map based approach for a cleaner implementation
if(table.equalsIgnoreCase("customers")){
if(table.equalsIgnoreCase("customer")){
String cmd = baseCmd + "-T c";
out.writeBytes(cmd + "\n");
}
Expand All @@ -253,11 +255,11 @@ else if(table.equalsIgnoreCase("parts")){
String cmd = baseCmd + "-T P";
out.writeBytes(cmd + "\n");
}
else if(table.equalsIgnoreCase("patsupp")){
else if(table.equalsIgnoreCase("partsupp")){
String cmd = baseCmd + "-T S";
out.writeBytes(cmd + "\n");
}
else if(table.equalsIgnoreCase("suppliers")){
else if(table.equalsIgnoreCase("supplier")){
String cmd = baseCmd + "-T s";
out.writeBytes(cmd + "\n");
}
Expand Down Expand Up @@ -347,13 +349,9 @@ else if(table.equals("s"))

final String suffixNew = suffix;

FilenameFilter tables = new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.endsWith(suffixNew);
}
};
FilenameFilter tables = (dir, name) -> name.endsWith(suffixNew);

for(File f: cwd.listFiles(tables)) {
for(File f: Objects.requireNonNull(cwd.listFiles(tables))) {
if(f != null)
{
System.out.println("Processing file: "+f.getName());
Expand Down

0 comments on commit 19e8c01

Please sign in to comment.