Skip to content

Commit c68889e

Browse files
committed
Merge branch 'better-submit' into 'main'
Use spark-submit to run queries Closes NVIDIA#9 and #3 See merge request nvspark/nds!10
2 parents 5f684e8 + d804965 commit c68889e

File tree

5 files changed

+316
-219
lines changed

5 files changed

+316
-219
lines changed

README.md

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -161,32 +161,30 @@ Sample command for Power Run:
161161
```
162162
python nds.py \
163163
--run power \
164-
--query-stream $TPCDS_HOME/query_templates/query_0.sql \
165-
--input-prefix hdfs:///data/NDS_parquet/ \
164+
--query-stream ./nds_query_streams/query_0.sql \
165+
--input-prefix hdfs:///data/NDS_parquet \
166166
--run-log test.log \
167167
--spark-submit-template power_run_gpu.template \
168-
--csv-output time.csv \
168+
--time-log time.csv \
169169
```
170170
171-
When it's finished, user will see parsed logs from terminal like:
172-
```
173-
......
174-
......
175-
====== Run query4 ======
176-
Time taken: 25532 ms
177-
====== Run query94 ======
178-
Time taken: 886 ms
179-
====== Run query20 ======
180-
Time taken: 1237 ms
181-
====== Run query14a ======
182-
Time taken: 11951 ms
171+
To simplify the performance analysis process, the script will create a local CSV file to save query(including TempView creation) and corresponding execution time. Note: please use `client` mode(set in your `spark-submit-template` file) when running in Yarn distributed environment to make sure the time log is saved correctly in your local path.
183172
184-
====== Total time : 325939 ms ======
173+
The file path is defined by `--time-log` argument.
185174
175+
The command above will use `collect()` action to trigger Spark job for each query. It is also supported to save query output to some place for further verification. User can also specify output format e.g. csv, parquet or orc:
176+
```
177+
python nds.py \
178+
--run power \
179+
--query-stream ./nds_query_streams/query_0.sql \
180+
--input-prefix hdfs:///data/NDS_parquet \
181+
--run-log test.log \
182+
--spark-submit-template power_run_gpu.template \
183+
--time-log time.csv \
184+
--output-prefix hdfs:///data/NDS_power_run_output \
185+
--output-format parquet
186186
```
187187
188-
To simplify the performance analysis process, the script will create a CSV file to save query and corresponding execution time.
189-
The file path is defined by `--csv-output` argument.
190188
191189
### Throughput Run
192190
Throughput Run simulates the scenario that multiple query sessions are running simultaneously in Spark. Different to Power Run, user needs to provide multiple query streams as input for `--query-stream` argument with `,` as seperator. Also the run log will be saved for each query stream independently with index number as naming suffix like _test.log_query_1_, _test.log_query2_ etc. and _time.csv_query_1_, _time.csv_query2_ etc.
@@ -198,11 +196,11 @@ Sample command for Throughput Run:
198196
```
199197
python nds.py \
200198
--run power \
201-
--query-stream $TPCDS_HOME/query_templates/query_0.sql,$TPCDS_HOME/query_templates/query_1.sql \
202-
--input-prefix hdfs:///data/NDS_parquet/ \
199+
--query-stream ./nds_query_streams/query_1.sql,./nds_query_streams/query_2.sql \
200+
--input-prefix hdfs:///data/NDS_parquet \
203201
--run-log test.log \
204202
--spark-submit-template power_run_gpu.template \
205-
--csv-output time.csv \
203+
--time-log time.csv \
206204
```
207205
208206

ds_convert.py

Lines changed: 87 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,13 @@
4343

4444
from pyspark.sql.types import VarcharType, CharType
4545

46-
def decimalType(non_decimal, precision, scale):
47-
if non_decimal:
48-
return DoubleType()
49-
else:
46+
def decimalType(use_decimal, precision, scale):
47+
if use_decimal:
5048
return DecimalType(precision, scale)
51-
49+
else:
50+
return DoubleType()
5251

53-
def get_schemas(non_decimal):
52+
def get_schemas(use_decimal):
5453
SCHEMAS = {}
5554
SCHEMAS["dbgen_version"] = StructType([
5655
StructField("dv_version", VarcharType(16)),
@@ -71,7 +70,7 @@ def get_schemas(non_decimal):
7170
StructField("ca_state", CharType(2)),
7271
StructField("ca_zip", CharType(10)),
7372
StructField("ca_country", VarcharType(20)),
74-
StructField("ca_gmt_offset", decimalType(non_decimal, 5, 2)),
73+
StructField("ca_gmt_offset", decimalType(use_decimal, 5, 2)),
7574
StructField("ca_location_type", CharType(20))
7675
])
7776

@@ -132,7 +131,7 @@ def get_schemas(non_decimal):
132131
StructField("w_state", CharType(2)),
133132
StructField("w_zip", CharType(10)),
134133
StructField("w_country", VarcharType(20)),
135-
StructField("w_gmt_offset", decimalType(non_decimal, 5, 2))
134+
StructField("w_gmt_offset", decimalType(use_decimal, 5, 2))
136135
])
137136

138137
SCHEMAS["ship_mode"] = StructType([
@@ -175,8 +174,8 @@ def get_schemas(non_decimal):
175174
StructField("i_rec_start_date", DateType()),
176175
StructField("i_rec_end_date", DateType()),
177176
StructField("i_item_desc", VarcharType(200)),
178-
StructField("i_current_price", decimalType(non_decimal, 7, 2)),
179-
StructField("i_wholesale_cost", decimalType(non_decimal, 7, 2)),
177+
StructField("i_current_price", decimalType(use_decimal, 7, 2)),
178+
StructField("i_wholesale_cost", decimalType(use_decimal, 7, 2)),
180179
StructField("i_brand_id", IntegerType()),
181180
StructField("i_brand", CharType(50)),
182181
StructField("i_class_id", IntegerType()),
@@ -222,8 +221,8 @@ def get_schemas(non_decimal):
222221
StructField("s_state", CharType(2)),
223222
StructField("s_zip", CharType(10)),
224223
StructField("s_country", VarcharType(20)),
225-
StructField("s_gmt_offset", decimalType(non_decimal, 5, 2)),
226-
StructField("s_tax_precentage", decimalType(non_decimal, 5, 2))
224+
StructField("s_gmt_offset", decimalType(use_decimal, 5, 2)),
225+
StructField("s_tax_precentage", decimalType(use_decimal, 5, 2))
227226
])
228227

229228
SCHEMAS["call_center"] = StructType([
@@ -256,8 +255,8 @@ def get_schemas(non_decimal):
256255
StructField("cc_state", CharType(2)),
257256
StructField("cc_zip", CharType(10)),
258257
StructField("cc_country", VarcharType(20)),
259-
StructField("cc_gmt_offset", decimalType(non_decimal, 5, 2)),
260-
StructField("cc_tax_percentage", decimalType(non_decimal, 5, 2))
258+
StructField("cc_gmt_offset", decimalType(use_decimal, 5, 2)),
259+
StructField("cc_tax_percentage", decimalType(use_decimal, 5, 2))
261260
])
262261

263262
SCHEMAS["customer"] = StructType([
@@ -306,8 +305,8 @@ def get_schemas(non_decimal):
306305
StructField("web_state", CharType(2)),
307306
StructField("web_zip", CharType(10)),
308307
StructField("web_country", VarcharType(20)),
309-
StructField("web_gmt_offset", decimalType(non_decimal, 5, 2)),
310-
StructField("web_tax_percentage", decimalType(non_decimal, 5, 2))
308+
StructField("web_gmt_offset", decimalType(use_decimal, 5, 2)),
309+
StructField("web_tax_percentage", decimalType(use_decimal, 5, 2))
311310
])
312311

313312
SCHEMAS["store_returns"] = StructType([
@@ -322,15 +321,15 @@ def get_schemas(non_decimal):
322321
StructField("sr_reason_sk", IntegerType()),
323322
StructField("sr_ticket_number", IntegerType(), nullable=False),
324323
StructField("sr_return_quantity", IntegerType()),
325-
StructField("sr_return_amt", decimalType(non_decimal, 7, 2)),
326-
StructField("sr_return_tax", decimalType(non_decimal, 7, 2)),
327-
StructField("sr_return_amt_inc_tax", decimalType(non_decimal, 7, 2)),
328-
StructField("sr_fee", decimalType(non_decimal, 7, 2)),
329-
StructField("sr_return_ship_cost", decimalType(non_decimal, 7, 2)),
330-
StructField("sr_refunded_cash", decimalType(non_decimal, 7, 2)),
331-
StructField("sr_reversed_charge", decimalType(non_decimal, 7, 2)),
332-
StructField("sr_store_credit", decimalType(non_decimal, 7, 2)),
333-
StructField("sr_net_loss", decimalType(non_decimal, 7, 2))
324+
StructField("sr_return_amt", decimalType(use_decimal, 7, 2)),
325+
StructField("sr_return_tax", decimalType(use_decimal, 7, 2)),
326+
StructField("sr_return_amt_inc_tax", decimalType(use_decimal, 7, 2)),
327+
StructField("sr_fee", decimalType(use_decimal, 7, 2)),
328+
StructField("sr_return_ship_cost", decimalType(use_decimal, 7, 2)),
329+
StructField("sr_refunded_cash", decimalType(use_decimal, 7, 2)),
330+
StructField("sr_reversed_charge", decimalType(use_decimal, 7, 2)),
331+
StructField("sr_store_credit", decimalType(use_decimal, 7, 2)),
332+
StructField("sr_net_loss", decimalType(use_decimal, 7, 2))
334333
])
335334

336335
SCHEMAS["household_demographics"] = StructType([
@@ -364,7 +363,7 @@ def get_schemas(non_decimal):
364363
StructField("p_start_date_sk", IntegerType()),
365364
StructField("p_end_date_sk", IntegerType()),
366365
StructField("p_item_sk", IntegerType()),
367-
StructField("p_cost", decimalType(non_decimal, 15, 2)),
366+
StructField("p_cost", decimalType(use_decimal, 15, 2)),
368367
StructField("p_response_target", IntegerType()),
369368
StructField("p_promo_name", CharType(50)),
370369
StructField("p_channel_dmail", CharType(1)),
@@ -418,15 +417,15 @@ def get_schemas(non_decimal):
418417
StructField("cr_reason_sk", IntegerType()),
419418
StructField("cr_order_number", IntegerType(), nullable=False),
420419
StructField("cr_return_quantity", IntegerType()),
421-
StructField("cr_return_amount", decimalType(non_decimal, 7, 2)),
422-
StructField("cr_return_tax", decimalType(non_decimal, 7, 2)),
423-
StructField("cr_return_amt_inc_tax", decimalType(non_decimal, 7, 2)),
424-
StructField("cr_fee", decimalType(non_decimal, 7, 2)),
425-
StructField("cr_return_ship_cost", decimalType(non_decimal, 7, 2)),
426-
StructField("cr_refunded_cash", decimalType(non_decimal, 7, 2)),
427-
StructField("cr_reversed_charge", decimalType(non_decimal, 7, 2)),
428-
StructField("cr_store_credit", decimalType(non_decimal, 7, 2)),
429-
StructField("cr_net_loss", decimalType(non_decimal, 7, 2))
420+
StructField("cr_return_amount", decimalType(use_decimal, 7, 2)),
421+
StructField("cr_return_tax", decimalType(use_decimal, 7, 2)),
422+
StructField("cr_return_amt_inc_tax", decimalType(use_decimal, 7, 2)),
423+
StructField("cr_fee", decimalType(use_decimal, 7, 2)),
424+
StructField("cr_return_ship_cost", decimalType(use_decimal, 7, 2)),
425+
StructField("cr_refunded_cash", decimalType(use_decimal, 7, 2)),
426+
StructField("cr_reversed_charge", decimalType(use_decimal, 7, 2)),
427+
StructField("cr_store_credit", decimalType(use_decimal, 7, 2)),
428+
StructField("cr_net_loss", decimalType(use_decimal, 7, 2))
430429
])
431430

432431
SCHEMAS["web_returns"] = StructType([
@@ -445,15 +444,15 @@ def get_schemas(non_decimal):
445444
StructField("wr_reason_sk", IntegerType()),
446445
StructField("wr_order_number", IntegerType(), nullable=False),
447446
StructField("wr_return_quantity", IntegerType()),
448-
StructField("wr_return_amt", decimalType(non_decimal, 7, 2)),
449-
StructField("wr_return_tax", decimalType(non_decimal, 7, 2)),
450-
StructField("wr_return_amt_inc_tax", decimalType(non_decimal, 7, 2)),
451-
StructField("wr_fee", decimalType(non_decimal, 7, 2)),
452-
StructField("wr_return_ship_cost", decimalType(non_decimal, 7, 2)),
453-
StructField("wr_refunded_cash", decimalType(non_decimal, 7, 2)),
454-
StructField("wr_reversed_charge", decimalType(non_decimal, 7, 2)),
455-
StructField("wr_account_credit", decimalType(non_decimal, 7, 2)),
456-
StructField("wr_net_loss", decimalType(non_decimal, 7, 2))
447+
StructField("wr_return_amt", decimalType(use_decimal, 7, 2)),
448+
StructField("wr_return_tax", decimalType(use_decimal, 7, 2)),
449+
StructField("wr_return_amt_inc_tax", decimalType(use_decimal, 7, 2)),
450+
StructField("wr_fee", decimalType(use_decimal, 7, 2)),
451+
StructField("wr_return_ship_cost", decimalType(use_decimal, 7, 2)),
452+
StructField("wr_refunded_cash", decimalType(use_decimal, 7, 2)),
453+
StructField("wr_reversed_charge", decimalType(use_decimal, 7, 2)),
454+
StructField("wr_account_credit", decimalType(use_decimal, 7, 2)),
455+
StructField("wr_net_loss", decimalType(use_decimal, 7, 2))
457456
])
458457

459458
SCHEMAS["web_sales"] = StructType([
@@ -476,21 +475,21 @@ def get_schemas(non_decimal):
476475
StructField("ws_promo_sk", IntegerType()),
477476
StructField("ws_order_number", IntegerType(), nullable=False),
478477
StructField("ws_quantity", IntegerType()),
479-
StructField("ws_wholesale_cost", decimalType(non_decimal, 7, 2)),
480-
StructField("ws_list_price", decimalType(non_decimal, 7, 2)),
481-
StructField("ws_sales_price", decimalType(non_decimal, 7, 2)),
482-
StructField("ws_ext_discount_amt", decimalType(non_decimal, 7, 2)),
483-
StructField("ws_ext_sales_price", decimalType(non_decimal, 7, 2)),
484-
StructField("ws_ext_wholesale_cost", decimalType(non_decimal, 7, 2)),
485-
StructField("ws_ext_list_price", decimalType(non_decimal, 7, 2)),
486-
StructField("ws_ext_tax", decimalType(non_decimal, 7, 2)),
487-
StructField("ws_coupon_amt", decimalType(non_decimal, 7, 2)),
488-
StructField("ws_ext_ship_cost", decimalType(non_decimal, 7, 2)),
489-
StructField("ws_net_paid", decimalType(non_decimal, 7, 2)),
490-
StructField("ws_net_paid_inc_tax", decimalType(non_decimal, 7, 2)),
491-
StructField("ws_net_paid_inc_ship", decimalType(non_decimal, 7, 2)),
492-
StructField("ws_net_paid_inc_ship_tax", decimalType(non_decimal, 7, 2)),
493-
StructField("ws_net_profit", decimalType(non_decimal, 7, 2))
478+
StructField("ws_wholesale_cost", decimalType(use_decimal, 7, 2)),
479+
StructField("ws_list_price", decimalType(use_decimal, 7, 2)),
480+
StructField("ws_sales_price", decimalType(use_decimal, 7, 2)),
481+
StructField("ws_ext_discount_amt", decimalType(use_decimal, 7, 2)),
482+
StructField("ws_ext_sales_price", decimalType(use_decimal, 7, 2)),
483+
StructField("ws_ext_wholesale_cost", decimalType(use_decimal, 7, 2)),
484+
StructField("ws_ext_list_price", decimalType(use_decimal, 7, 2)),
485+
StructField("ws_ext_tax", decimalType(use_decimal, 7, 2)),
486+
StructField("ws_coupon_amt", decimalType(use_decimal, 7, 2)),
487+
StructField("ws_ext_ship_cost", decimalType(use_decimal, 7, 2)),
488+
StructField("ws_net_paid", decimalType(use_decimal, 7, 2)),
489+
StructField("ws_net_paid_inc_tax", decimalType(use_decimal, 7, 2)),
490+
StructField("ws_net_paid_inc_ship", decimalType(use_decimal, 7, 2)),
491+
StructField("ws_net_paid_inc_ship_tax", decimalType(use_decimal, 7, 2)),
492+
StructField("ws_net_profit", decimalType(use_decimal, 7, 2))
494493
])
495494

496495
SCHEMAS["catalog_sales"] = StructType([
@@ -513,21 +512,21 @@ def get_schemas(non_decimal):
513512
StructField("cs_promo_sk", IntegerType()),
514513
StructField("cs_order_number", IntegerType(), nullable=False),
515514
StructField("cs_quantity", IntegerType()),
516-
StructField("cs_wholesale_cost", decimalType(non_decimal, 7, 2)),
517-
StructField("cs_list_price", decimalType(non_decimal, 7, 2)),
518-
StructField("cs_sales_price", decimalType(non_decimal, 7, 2)),
519-
StructField("cs_ext_discount_amt", decimalType(non_decimal, 7, 2)),
520-
StructField("cs_ext_sales_price", decimalType(non_decimal, 7, 2)),
521-
StructField("cs_ext_wholesale_cost", decimalType(non_decimal, 7, 2)),
522-
StructField("cs_ext_list_price", decimalType(non_decimal, 7, 2)),
523-
StructField("cs_ext_tax", decimalType(non_decimal, 7, 2)),
524-
StructField("cs_coupon_amt", decimalType(non_decimal, 7, 2)),
525-
StructField("cs_ext_ship_cost", decimalType(non_decimal, 7, 2)),
526-
StructField("cs_net_paid", decimalType(non_decimal, 7, 2)),
527-
StructField("cs_net_paid_inc_tax", decimalType(non_decimal, 7, 2)),
528-
StructField("cs_net_paid_inc_ship", decimalType(non_decimal, 7, 2)),
529-
StructField("cs_net_paid_inc_ship_tax", decimalType(non_decimal, 7, 2)),
530-
StructField("cs_net_profit", decimalType(non_decimal, 7, 2))
515+
StructField("cs_wholesale_cost", decimalType(use_decimal, 7, 2)),
516+
StructField("cs_list_price", decimalType(use_decimal, 7, 2)),
517+
StructField("cs_sales_price", decimalType(use_decimal, 7, 2)),
518+
StructField("cs_ext_discount_amt", decimalType(use_decimal, 7, 2)),
519+
StructField("cs_ext_sales_price", decimalType(use_decimal, 7, 2)),
520+
StructField("cs_ext_wholesale_cost", decimalType(use_decimal, 7, 2)),
521+
StructField("cs_ext_list_price", decimalType(use_decimal, 7, 2)),
522+
StructField("cs_ext_tax", decimalType(use_decimal, 7, 2)),
523+
StructField("cs_coupon_amt", decimalType(use_decimal, 7, 2)),
524+
StructField("cs_ext_ship_cost", decimalType(use_decimal, 7, 2)),
525+
StructField("cs_net_paid", decimalType(use_decimal, 7, 2)),
526+
StructField("cs_net_paid_inc_tax", decimalType(use_decimal, 7, 2)),
527+
StructField("cs_net_paid_inc_ship", decimalType(use_decimal, 7, 2)),
528+
StructField("cs_net_paid_inc_ship_tax", decimalType(use_decimal, 7, 2)),
529+
StructField("cs_net_profit", decimalType(use_decimal, 7, 2))
531530
])
532531

533532
SCHEMAS["store_sales"] = StructType([
@@ -542,18 +541,18 @@ def get_schemas(non_decimal):
542541
StructField("ss_promo_sk", IntegerType()),
543542
StructField("ss_ticket_number", IntegerType(), nullable=False),
544543
StructField("ss_quantity", IntegerType()),
545-
StructField("ss_wholesale_cost", decimalType(non_decimal, 7, 2)),
546-
StructField("ss_list_price", decimalType(non_decimal, 7, 2)),
547-
StructField("ss_sales_price", decimalType(non_decimal, 7, 2)),
548-
StructField("ss_ext_discount_amt", decimalType(non_decimal, 7, 2)),
549-
StructField("ss_ext_sales_price", decimalType(non_decimal, 7, 2)),
550-
StructField("ss_ext_wholesale_cost", decimalType(non_decimal, 7, 2)),
551-
StructField("ss_ext_list_price", decimalType(non_decimal, 7, 2)),
552-
StructField("ss_ext_tax", decimalType(non_decimal, 7, 2)),
553-
StructField("ss_coupon_amt", decimalType(non_decimal, 7, 2)),
554-
StructField("ss_net_paid", decimalType(non_decimal, 7, 2)),
555-
StructField("ss_net_paid_inc_tax", decimalType(non_decimal, 7, 2)),
556-
StructField("ss_net_profit", decimalType(non_decimal, 7, 2))
544+
StructField("ss_wholesale_cost", decimalType(use_decimal, 7, 2)),
545+
StructField("ss_list_price", decimalType(use_decimal, 7, 2)),
546+
StructField("ss_sales_price", decimalType(use_decimal, 7, 2)),
547+
StructField("ss_ext_discount_amt", decimalType(use_decimal, 7, 2)),
548+
StructField("ss_ext_sales_price", decimalType(use_decimal, 7, 2)),
549+
StructField("ss_ext_wholesale_cost", decimalType(use_decimal, 7, 2)),
550+
StructField("ss_ext_list_price", decimalType(use_decimal, 7, 2)),
551+
StructField("ss_ext_tax", decimalType(use_decimal, 7, 2)),
552+
StructField("ss_coupon_amt", decimalType(use_decimal, 7, 2)),
553+
StructField("ss_net_paid", decimalType(use_decimal, 7, 2)),
554+
StructField("ss_net_paid_inc_tax", decimalType(use_decimal, 7, 2)),
555+
StructField("ss_net_profit", decimalType(use_decimal, 7, 2))
557556
])
558557
return SCHEMAS
559558

@@ -602,8 +601,8 @@ def store(df, filename, prefix=""):
602601

603602
results = {}
604603

605-
schemas = get_schemas(args.non_decimal)
606-
604+
schemas = get_schemas(use_decimal = not args.non_decimal)
605+
607606
for fn, schema in schemas.items():
608607
results[fn] = timeit.timeit(lambda: store(load(f"{fn}{args.input_suffix}", schema, prefix=args.input_prefix), f"{fn}", args.output_prefix), number=1)
609608

0 commit comments

Comments
 (0)