From b9ba0f398723f56f7e85c3562570a35ff8730372 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Wed, 1 May 2024 18:55:30 -0400 Subject: [PATCH] Various improvements to the TPC-C runner (#504) - Batch inserts - Add Zipfian skew to how keys are selected - Remove use of fetchall() where not needed (does not really impact BRAD though) These changes reduce new order latency from ~95 ms down to ~45 ms. Part of #487. --- experiments/17-chbenchmark/common.sh | 23 +- experiments/17-chbenchmark/debug/COND | 45 +++ .../17-chbenchmark/debug/run_aurora_direct.sh | 20 +- .../17-chbenchmark/debug/run_aurora_timing.sh | 20 +- .../calibration/transactions/chbenchmark/COND | 3 + .../transactions/chbenchmark/run_instance.sh | 7 +- .../py-tpcc/pytpcc/drivers/auroradriver.py | 81 ++--- .../pytpcc/drivers/auroratimingdriver.py | 287 +++++++++++++++++- .../py-tpcc/pytpcc/drivers/braddriver.py | 61 ++-- .../py-tpcc/pytpcc/runtime/executor.py | 39 ++- workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py | 12 +- 11 files changed, 510 insertions(+), 88 deletions(-) diff --git a/experiments/17-chbenchmark/common.sh b/experiments/17-chbenchmark/common.sh index 04ad6a0b..2db49e0e 100644 --- a/experiments/17-chbenchmark/common.sh +++ b/experiments/17-chbenchmark/common.sh @@ -14,13 +14,18 @@ function start_brad() { function run_tpcc() { pushd ../../../workloads/chbenchmark/py-tpcc/ - RECORD_DETAILED_STATS=1 python3 -m pytpcc.tpcc brad \ - --no-load \ - --config $abs_txn_config_file \ - --warehouses $txn_warehouses \ - --duration $run_for_s \ - --clients $t_clients \ - --scalefactor $txn_scale_factor & + local args=( + --no-load + --config $abs_txn_config_file + --warehouses $txn_warehouses + --duration $run_for_s + --clients $t_clients + --scalefactor $txn_scale_factor + ) + if [[ ! -z $txn_zipfian_alpha ]]; then + args+=(--zipfian-alpha $txn_zipfian_alpha) + fi + RECORD_DETAILED_STATS=1 python3 -m pytpcc.tpcc brad "${args[@]}" & tpcc_pid=$! popd } @@ -91,6 +96,10 @@ function extract_named_arguments() { if [[ $phys_arg =~ --txn-config-file=.+ ]]; then txn_config_file=${phys_arg:18} fi + + if [[ $phys_arg =~ --txn-zipfian-alpha=.+ ]]; then + txn_zipfian_alpha=${phys_arg:20} + fi done } diff --git a/experiments/17-chbenchmark/debug/COND b/experiments/17-chbenchmark/debug/COND index 4cfa490f..7feaa352 100644 --- a/experiments/17-chbenchmark/debug/COND +++ b/experiments/17-chbenchmark/debug/COND @@ -1,3 +1,6 @@ +ZIPFIAN_ALPHA = 5.0 + + run_command( name="txn_lat", run="./run_tpcc.sh", @@ -13,6 +16,22 @@ run_command( }, ) +run_command( + name="txn_lat_zipf", + run="./run_tpcc.sh", + options={ + "physical-config-file": "../../../config/physical_config_chbench.yml", + "system-config-file": "debug_config.yml", # Relative to one level up. + "txn-config-file": "brad.config", + "schema-name": "chbenchmark", + "txn-warehouses": 1740, + "txn-scale-factor": 1, # TBD + "t-clients": 1, # TBD + "run-for-s": 180, + "txn-zipfian-alpha": ZIPFIAN_ALPHA, + }, +) + run_command( name="aurora_direct", run="./run_aurora_direct.sh", @@ -25,6 +44,19 @@ run_command( }, ) +run_command( + name="aurora_direct_zipf", + run="./run_aurora_direct.sh", + options={ + "txn-config-file": "aurora.config", + "txn-warehouses": 1740, + "txn-scale-factor": 1, # TBD + "t-clients": 1, # TBD + "run-for-s": 180, + "txn-zipfian-alpha": ZIPFIAN_ALPHA, + }, +) + run_experiment( name="aurora_timing", run="./run_aurora_timing.sh", @@ -36,3 +68,16 @@ run_experiment( "run-for-s": 30, }, ) + +run_experiment( + name="aurora_timing_zipf", + run="./run_aurora_timing.sh", + options={ + "txn-config-file": "aurora.config", + "txn-warehouses": 1740, + "txn-scale-factor": 1, # TBD + "t-clients": 1, # TBD + "run-for-s": 30, + "txn-zipfian-alpha": ZIPFIAN_ALPHA, + }, +) diff --git a/experiments/17-chbenchmark/debug/run_aurora_direct.sh b/experiments/17-chbenchmark/debug/run_aurora_direct.sh index df6b232a..36d85f2b 100755 --- a/experiments/17-chbenchmark/debug/run_aurora_direct.sh +++ b/experiments/17-chbenchmark/debug/run_aurora_direct.sh @@ -9,10 +9,18 @@ extract_named_arguments $@ abs_txn_config_file=$(realpath $txn_config_file) cd ../../../workloads/chbenchmark/py-tpcc/ -RECORD_DETAILED_STATS=1 python3 -m pytpcc.tpcc aurora \ - --no-load \ - --config $abs_txn_config_file \ - --warehouses $txn_warehouses \ - --duration $run_for_s \ - --clients $t_clients \ + +args=( + --no-load + --config $abs_txn_config_file + --warehouses $txn_warehouses + --duration $run_for_s + --clients $t_clients --scalefactor $txn_scale_factor +) + +if [[ ! -z $txn_zipfian_alpha ]]; then + args+=(--zipfian-alpha $txn_zipfian_alpha) +fi + +RECORD_DETAILED_STATS=1 python3 -m pytpcc.tpcc aurora "${args[@]}" diff --git a/experiments/17-chbenchmark/debug/run_aurora_timing.sh b/experiments/17-chbenchmark/debug/run_aurora_timing.sh index cb96028a..d28f1633 100755 --- a/experiments/17-chbenchmark/debug/run_aurora_timing.sh +++ b/experiments/17-chbenchmark/debug/run_aurora_timing.sh @@ -9,10 +9,18 @@ extract_named_arguments $@ abs_txn_config_file=$(realpath $txn_config_file) cd ../../../workloads/chbenchmark/py-tpcc/ -RECORD_DETAILED_STATS=1 python3 -m pytpcc.tpcc auroratiming \ - --no-load \ - --config $abs_txn_config_file \ - --warehouses $txn_warehouses \ - --duration $run_for_s \ - --clients $t_clients \ + +args=( + --no-load + --config $abs_txn_config_file + --warehouses $txn_warehouses + --duration $run_for_s + --clients $t_clients --scalefactor $txn_scale_factor +) + +if [[ ! -z $txn_zipfian_alpha ]]; then + args+=(--zipfian-alpha $txn_zipfian_alpha) +fi + +RECORD_DETAILED_STATS=1 python3 -m pytpcc.tpcc auroratiming "${args[@]}" diff --git a/tools/calibration/transactions/chbenchmark/COND b/tools/calibration/transactions/chbenchmark/COND index f59e559a..f8901f59 100644 --- a/tools/calibration/transactions/chbenchmark/COND +++ b/tools/calibration/transactions/chbenchmark/COND @@ -13,6 +13,8 @@ COND_INSTANCES = { instance: instance.replace(".", "_").replace("db.", "") for instance in INSTANCES } +ZIPFIAN_ALPHA = 5.0 + combine( name="all", deps=[ @@ -36,6 +38,7 @@ for instance in INSTANCES: "txn-warehouses": 1740, "txn-config-file": "aurora.config", "schema-name": "chbenchmark", + "txn-zipfian-alpha": ZIPFIAN_ALPHA, "instance": instance, }, ) diff --git a/tools/calibration/transactions/chbenchmark/run_instance.sh b/tools/calibration/transactions/chbenchmark/run_instance.sh index dfe0c6b5..3890358c 100755 --- a/tools/calibration/transactions/chbenchmark/run_instance.sh +++ b/tools/calibration/transactions/chbenchmark/run_instance.sh @@ -38,6 +38,10 @@ function extract_named_arguments() { if [[ $phys_arg =~ --instance=.+ ]]; then instance=${phys_arg:11} fi + + if [[ $phys_arg =~ --txn-zipfian-alpha=.+ ]]; then + txn_zipfian_alpha=${phys_arg:20} + fi done } @@ -74,7 +78,8 @@ RECORD_DETAILED_STATS=1 python3 -m pytpcc.tpcc aurora \ --duration $run_for_s \ --clients $t_clients \ --scalefactor 1 \ - --lat-sample-prob 0.25 + --lat-sample-prob 0.25 \ + --txn-zipfian-alpha $txn_zipfian_alpha popd >&2 echo "Waiting 10 seconds before retrieving metrics..." diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py index 79e65ebc..26d10812 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py @@ -36,6 +36,8 @@ "getStockInfo": "SELECT s_quantity, s_data, s_ytd, s_order_cnt, s_remote_cnt, s_dist_{:02d} FROM stock WHERE s_i_id = {} AND s_w_id = {}", # d_id, ol_i_id, ol_supply_w_id "updateStock": "UPDATE stock SET s_quantity = {}, s_ytd = {}, s_order_cnt = {}, s_remote_cnt = {} WHERE s_i_id = {} AND s_w_id = {}", # s_quantity, s_order_cnt, s_remote_cnt, ol_i_id, ol_supply_w_id "createOrderLine": "INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_delivery_d, ol_quantity, ol_amount, ol_dist_info) VALUES ({}, {}, {}, {}, {}, {}, '{}', {}, {}, '{}')", # o_id, d_id, w_id, ol_number, ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_dist_info + "createOrderLineMultivalue": "INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_delivery_d, ol_quantity, ol_amount, ol_dist_info) VALUES ", + "createOrderLineValues": "({}, {}, {}, {}, {}, {}, '{}', {}, {}, '{}')", }, "ORDER_STATUS": { "getCustomerByCustomerId": "SELECT c_id, c_first, c_middle, c_last, c_balance FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", # w_id, d_id, c_id @@ -81,7 +83,7 @@ class AuroraDriver(AbstractDriver): } def __init__(self, ddl: str) -> None: - super().__init__("brad", ddl) + super().__init__("aurora", ddl) self._connection: Optional[PsycopgConnection] = None self._cursor: Optional[PsycopgCursor] = None self._config: Dict[str, Any] = {} @@ -127,19 +129,19 @@ def doDelivery(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: self._cursor.execute_sync("BEGIN") for d_id in range(1, constants.DISTRICTS_PER_WAREHOUSE + 1): self._cursor.execute_sync(q["getNewOrder"].format(d_id, w_id)) - r = self._cursor.fetchall_sync() - if len(r) == 0: + r = self._cursor.fetchone_sync() + if r is None: ## No orders for this district: skip it. Note: This must be reported if > 1% continue - no_o_id = r[0][0] + no_o_id = r[0] self._cursor.execute_sync(q["getCId"].format(no_o_id, d_id, w_id)) - r = self._cursor.fetchall_sync() - c_id = r[0][0] + r = self._cursor.fetchone_sync() + c_id = r[0] self._cursor.execute_sync(q["sumOLAmount"].format(no_o_id, d_id, w_id)) - r = self._cursor.fetchall_sync() - ol_total = decimal.Decimal(r[0][0]) + r = self._cursor.fetchone_sync() + ol_total = decimal.Decimal(r[0]) self._cursor.execute_sync( q["deleteNewOrder"].format(d_id, w_id, no_o_id) @@ -203,8 +205,8 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ## Determine if this is an all local order or not all_local = all_local and i_w_ids[i] == w_id self._cursor.execute_sync(q["getItemInfo"].format(i_ids[i])) - r = self._cursor.fetchall_sync() - items.append(r[0]) + r = self._cursor.fetchone_sync() + items.append(r) assert len(items) == len(i_ids) ## TPCC defines 1% of neworder gives a wrong itemid, causing rollback. @@ -219,18 +221,18 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ## Collect Information from WAREHOUSE, DISTRICT, and CUSTOMER ## ---------------- self._cursor.execute_sync(q["getWarehouseTaxRate"].format(w_id)) - r = self._cursor.fetchall_sync() - w_tax = r[0][0] + r = self._cursor.fetchone_sync() + w_tax = r[0] self._cursor.execute_sync(q["getDistrict"].format(d_id, w_id)) - r = self._cursor.fetchall_sync() - district_info = r[0] + r = self._cursor.fetchone_sync() + district_info = r d_tax = district_info[0] d_next_o_id = district_info[1] self._cursor.execute_sync(q["getCustomer"].format(w_id, d_id, c_id)) - r = self._cursor.fetchall_sync() - customer_info = r[0] + r = self._cursor.fetchone_sync() + customer_info = r c_discount = customer_info[0] ## ---------------- @@ -261,6 +263,7 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ## Insert Order Item Information ## ---------------- item_data = [] + insert_value_strings = [] total = 0 for i in range(len(i_ids)): ol_number = i + 1 @@ -276,15 +279,15 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: self._cursor.execute_sync( q["getStockInfo"].format(d_id, ol_i_id, ol_supply_w_id) ) - r = self._cursor.fetchall_sync() - if len(r) == 0: + r = self._cursor.fetchone_sync() + if r is None: logger.warning( "No STOCK record for (ol_i_id=%d, ol_supply_w_id=%d)", ol_i_id, ol_supply_w_id, ) continue - stockInfo = r[0] + stockInfo = r s_quantity = stockInfo[0] s_ytd = decimal.Decimal(stockInfo[2]) s_order_cnt = int(stockInfo[3]) @@ -326,7 +329,7 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ol_amount = ol_quantity * i_price total += ol_amount - createOrderLine = q["createOrderLine"].format( + createOrderLineValues = q["createOrderLineValues"].format( d_next_o_id, d_id, w_id, @@ -338,7 +341,7 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ol_amount, s_dist_xx, ) - self._cursor.execute_sync(createOrderLine) + insert_value_strings.append(createOrderLineValues) ## Add the info to be returned item_data.append( @@ -346,6 +349,12 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ) ## FOR + # Do one multivalue insert. + insertOrderLines = q["createOrderLineMultivalue"] + ", ".join( + insert_value_strings + ) + self._cursor.execute_sync(insertOrderLines) + ## Commit! self._cursor.execute_sync("COMMIT") @@ -385,8 +394,8 @@ def doOrderStatus(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: self._cursor.execute_sync( q["getCustomerByCustomerId"].format(w_id, d_id, c_id) ) - r = self._cursor.fetchall_sync() - customer = r[0] + r = self._cursor.fetchone_sync() + customer = r else: # Get the midpoint customer's id self._cursor.execute_sync( @@ -404,13 +413,13 @@ def doOrderStatus(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: getLastOrder = q["getLastOrder"].format(w_id, d_id, c_id) self._cursor.execute_sync(getLastOrder) - r = self._cursor.fetchall_sync() - order = r[0] + r = self._cursor.fetchone_sync() + order = r if order: self._cursor.execute_sync( q["getOrderLines"].format(w_id, d_id, order[0]) ) - r = self._cursor.fetchall_sync() + r = self._cursor.fetchone_sync() orderLines = r else: orderLines = [] @@ -443,8 +452,8 @@ def doPayment(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: self._cursor.execute_sync( q["getCustomerByCustomerId"].format(w_id, d_id, c_id) ) - r = self._cursor.fetchall_sync() - customer = r[0] + r = self._cursor.fetchone_sync() + customer = r else: # Get the midpoint customer's id self._cursor.execute_sync( @@ -464,12 +473,12 @@ def doPayment(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: c_data = customer[17] self._cursor.execute_sync(q["getWarehouse"].format(w_id)) - r = self._cursor.fetchall_sync() - warehouse = r[0] + r = self._cursor.fetchone_sync() + warehouse = r self._cursor.execute_sync(q["getDistrict"].format(w_id, d_id)) - r = self._cursor.fetchall_sync() - district = r[0] + r = self._cursor.fetchone_sync() + district = r self._cursor.execute_sync( q["updateWarehouseBalance"].format(h_amount, w_id) @@ -548,8 +557,8 @@ def doStockLevel(self, params: Dict[str, Any]) -> int: self._cursor.execute_sync("BEGIN") self._cursor.execute_sync(q["getOId"].format(w_id, d_id)) - r = self._cursor.fetchall_sync() - result = r[0] + r = self._cursor.fetchone_sync() + result = r assert result o_id = result[0] @@ -558,8 +567,8 @@ def doStockLevel(self, params: Dict[str, Any]) -> int: w_id, d_id, o_id, (o_id - 20), w_id, threshold ) ) - r = self._cursor.fetchall_sync() - result = r[0] + r = self._cursor.fetchone_sync() + result = r self._cursor.execute_sync("COMMIT") return int(result[0]) diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroratimingdriver.py b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroratimingdriver.py index d1d88cf4..8443fd53 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroratimingdriver.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroratimingdriver.py @@ -38,6 +38,8 @@ "getStockInfo": "SELECT s_quantity, s_data, s_ytd, s_order_cnt, s_remote_cnt, s_dist_{:02d} FROM stock WHERE s_i_id = {} AND s_w_id = {}", # d_id, ol_i_id, ol_supply_w_id "updateStock": "UPDATE stock SET s_quantity = {}, s_ytd = {}, s_order_cnt = {}, s_remote_cnt = {} WHERE s_i_id = {} AND s_w_id = {}", # s_quantity, s_order_cnt, s_remote_cnt, ol_i_id, ol_supply_w_id "createOrderLine": "INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_delivery_d, ol_quantity, ol_amount, ol_dist_info) VALUES ({}, {}, {}, {}, {}, {}, '{}', {}, {}, '{}')", # o_id, d_id, w_id, ol_number, ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_dist_info + "createOrderLineMultivalue": "INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_delivery_d, ol_quantity, ol_amount, ol_dist_info) VALUES ", + "createOrderLineValues": "({}, {}, {}, {}, {}, {}, '{}', {}, {}, '{}')", }, "ORDER_STATUS": { "getCustomerByCustomerId": "SELECT c_id, c_first, c_middle, c_last, c_balance FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", # w_id, d_id, c_id @@ -83,7 +85,7 @@ class AuroraTimingDriver(AbstractDriver): } def __init__(self, ddl: str) -> None: - super().__init__("brad", ddl) + super().__init__("aurora timing", ddl) self._connection: Optional[PsycopgConnection] = None self._cursor: Optional[PsycopgCursor] = None self._config: Dict[str, Any] = {} @@ -127,7 +129,7 @@ def executeStart(self): measure_file_path = cond.in_output_dir("aurora_timing.csv") self._measure_file = open(measure_file_path, "w", encoding="UTF-8") print( - "init,begin,getitems,getwdc,getorder,insertorder,commit,collect,total", + "init,begin,getitems,getwdc,getorder,insertorder,commit,collect,multi_insert_time,total", file=self._measure_file, ) @@ -224,7 +226,7 @@ def doDelivery(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: print(traceback.format_exc()) raise - def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: + def doNewOrderOriginal(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: try: assert self._cursor is not None @@ -494,6 +496,285 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: print(traceback.format_exc()) raise + def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: + try: + assert self._cursor is not None + + no_start = time.time() + q = TXN_QUERIES["NEW_ORDER"] + w_id = params["w_id"] + d_id = params["d_id"] + c_id = params["c_id"] + o_entry_d = params["o_entry_d"] + i_ids = params["i_ids"] + i_w_ids = params["i_w_ids"] + i_qtys = params["i_qtys"] + + assert len(i_ids) > 0 + assert len(i_ids) == len(i_w_ids) + assert len(i_ids) == len(i_qtys) + + no_pbegin = time.time() + self._cursor.execute_sync("BEGIN") + no_abegin = time.time() + all_local = True + items = [] + for i in range(len(i_ids)): + ## Determine if this is an all local order or not + all_local = all_local and i_w_ids[i] == w_id + self._cursor.execute_sync(q["getItemInfo"].format(i_ids[i])) + r = self._cursor.fetchone_sync() + items.append(r) + assert len(items) == len(i_ids) + no_getitems = time.time() + + ## TPCC defines 1% of neworder gives a wrong itemid, causing rollback. + ## Note that this will happen with 1% of transactions on purpose. + for item in items: + if item is None or len(item) == 0: + self._cursor.execute_sync("ROLLBACK") + return + ## FOR + + ## ---------------- + ## Collect Information from WAREHOUSE, DISTRICT, and CUSTOMER + ## ---------------- + wdc_start = time.time() + get_warehouse = q["getWarehouseTaxRate"].format(w_id) + self._cursor.execute_sync(get_warehouse) + r = self._cursor.fetchone_sync() + w_tax = r[0] + wdc_warehouse_tax_rate = time.time() + + get_district = q["getDistrict"].format(d_id, w_id) + self._cursor.execute_sync(get_district) + r = self._cursor.fetchone_sync() + district_info = r + d_tax = district_info[0] + d_next_o_id = district_info[1] + wdc_district = time.time() + + get_customer = q["getCustomer"].format(w_id, d_id, c_id) + self._cursor.execute_sync(get_customer) + r = self._cursor.fetchone_sync() + customer_info = r + c_discount = customer_info[0] + no_get_wdc_info = time.time() + + if self._query_log_file is not None: + print(get_warehouse, file=self._query_log_file) + print(get_district, file=self._query_log_file) + print(get_customer, file=self._query_log_file) + + ## ---------------- + ## Insert Order Information + ## ---------------- + ol_cnt = len(i_ids) + o_carrier_id = constants.NULL_CARRIER_ID + + self._cursor.execute_sync( + q["incrementNextOrderId"].format(d_next_o_id + 1, d_id, w_id) + ) + createOrder = q["createOrder"].format( + d_next_o_id, + d_id, + w_id, + c_id, + o_entry_d.strftime("%Y-%m-%d %H:%M:%S"), + o_carrier_id, + ol_cnt, + 1 if all_local else 0, + ) + self._cursor.execute_sync(createOrder) + self._cursor.execute_sync( + q["createNewOrder"].format(d_next_o_id, d_id, w_id) + ) + no_ins_order_info = time.time() + + ## ---------------- + ## Insert Order Item Information + ## ---------------- + item_data = [] + total = 0 + insert_metadata = [] + insert_value_strs = [] + for i in range(len(i_ids)): + io_start = time.time() + ol_number = i + 1 + ol_supply_w_id = i_w_ids[i] + ol_i_id = i_ids[i] + ol_quantity = i_qtys[i] + + itemInfo = items[i] + i_name = itemInfo[1] + i_data = itemInfo[2] + i_price = decimal.Decimal(itemInfo[0]) + io_init = time.time() + + get_stock_info = q["getStockInfo"].format(d_id, ol_i_id, ol_supply_w_id) + self._cursor.execute_sync(get_stock_info) + r = self._cursor.fetchone_sync() + io_fetch_stock = time.time() + if r is None: + logger.warning( + "No STOCK record for (ol_i_id=%d, ol_supply_w_id=%d)", + ol_i_id, + ol_supply_w_id, + ) + continue + stockInfo = r + s_quantity = stockInfo[0] + s_ytd = decimal.Decimal(stockInfo[2]) + s_order_cnt = int(stockInfo[3]) + s_remote_cnt = int(stockInfo[4]) + s_data = stockInfo[1] + s_dist_xx = stockInfo[5] # Fetches data from the s_dist_[d_id] column + + ## Update stock + s_ytd += ol_quantity + if s_quantity >= ol_quantity + 10: + s_quantity = s_quantity - ol_quantity + else: + s_quantity = s_quantity + 91 - ol_quantity + s_order_cnt += 1 + + if ol_supply_w_id != w_id: + s_remote_cnt += 1 + io_stock_prep = time.time() + + update_stock = q["updateStock"].format( + s_quantity, + s_ytd.quantize(decimal.Decimal("1.00")), + s_order_cnt, + s_remote_cnt, + ol_i_id, + ol_supply_w_id, + ) + self._cursor.execute_sync(update_stock) + io_update_stock = time.time() + + if ( + i_data.find(constants.ORIGINAL_STRING) != -1 + and s_data.find(constants.ORIGINAL_STRING) != -1 + ): + brand_generic = "B" + else: + brand_generic = "G" + + ## Transaction profile states to use "ol_quantity * i_price" + ol_amount = ol_quantity * i_price + total += ol_amount + io_ol_prep = time.time() + + createOrderLineValues = q["createOrderLineValues"].format( + d_next_o_id, + d_id, + w_id, + ol_number, + ol_i_id, + ol_supply_w_id, + o_entry_d.strftime("%Y-%m-%d %H:%M:%S"), + ol_quantity, + ol_amount, + s_dist_xx, + ) + insert_value_strs.append(createOrderLineValues) + io_ol_insert = time.time() + + ## Add the info to be returned + item_data.append( + (i_name, s_quantity, brand_generic, i_price, ol_amount) + ) + io_ol_append = time.time() + + insert_metadata.append( + ( + io_init - io_start, + io_fetch_stock - io_init, + io_stock_prep - io_fetch_stock, + io_update_stock - io_stock_prep, + io_ol_prep - io_update_stock, + io_ol_insert - io_ol_prep, + io_ol_append - io_ol_insert, + io_ol_append - io_start, + ) + ) + + if self._query_log_file is not None: + print(get_stock_info, file=self._query_log_file) + print(update_stock, file=self._query_log_file) + + no_mv_insert_pre = time.time() + ## FOR + insert_order_line_query = q["createOrderLineMultivalue"] + ", ".join( + insert_value_strs + ) + self._cursor.execute_sync(insert_order_line_query) + no_mv_insert_after = time.time() + if self._query_log_file is not None: + print(insert_order_line_query, file=self._query_log_file) + no_insert_order_line = time.time() + + ## Commit! + self._cursor.execute_sync("COMMIT") + no_commit = time.time() + + ## Adjust the total for the discount + # print "c_discount:", c_discount, type(c_discount) + # print "w_tax:", w_tax, type(w_tax) + # print "d_tax:", d_tax, type(d_tax) + total = int( + total + * (1 - decimal.Decimal(c_discount)) + * (1 + decimal.Decimal(w_tax) + decimal.Decimal(d_tax)) + ) + + ## Pack up values the client is missing (see TPC-C 2.4.3.5) + misc = [(w_tax, d_tax, d_next_o_id, total)] + no_collect = time.time() + + if self._measure_file is not None: + init_time = no_pbegin - no_start + begin_time = no_abegin - no_pbegin + getitems_time = no_getitems - no_abegin + getwdc_time = no_get_wdc_info - no_getitems + getorder_time = no_ins_order_info - no_get_wdc_info + insertorder_time = no_insert_order_line - no_ins_order_info + commit_time = no_commit - no_insert_order_line + collect_time = no_collect - no_commit + total_time = no_collect - no_start + multi_insert_time = no_mv_insert_after - no_mv_insert_pre + print( + f"{init_time},{begin_time},{getitems_time},{getwdc_time},{getorder_time},{insertorder_time},{commit_time},{collect_time},{multi_insert_time},{total_time}", + file=self._measure_file, + ) + + if self._wdc_stats_file is not None: + tax_rate_time = wdc_warehouse_tax_rate - wdc_start + district_time = wdc_district - wdc_warehouse_tax_rate + customer_time = no_get_wdc_info - wdc_district + total_time = no_get_wdc_info - wdc_start + print( + f"{tax_rate_time},{district_time},{customer_time},{total_time}", + file=self._wdc_stats_file, + ) + + if self._ol_stats_file is not None: + for im in insert_metadata: + print( + "{},{},{},{},{},{},{},{},{}".format(self._ins_ol_counter, *im), + file=self._ol_stats_file, + ) + self._ins_ol_counter += 1 + + return [customer_info, misc, item_data] + + except Exception as ex: + if self._nonsilent_errs: + print("Error in NEWORDER", str(ex)) + print(traceback.format_exc()) + raise + def doOrderStatus(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: try: assert self._cursor is not None diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py index 9458a0c1..fa6e678f 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py @@ -35,6 +35,8 @@ "getStockInfo": "SELECT s_quantity, s_data, s_ytd, s_order_cnt, s_remote_cnt, s_dist_{:02d} FROM stock WHERE s_i_id = {} AND s_w_id = {}", # d_id, ol_i_id, ol_supply_w_id "updateStock": "UPDATE stock SET s_quantity = {}, s_ytd = {}, s_order_cnt = {}, s_remote_cnt = {} WHERE s_i_id = {} AND s_w_id = {}", # s_quantity, s_order_cnt, s_remote_cnt, ol_i_id, ol_supply_w_id "createOrderLine": "INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_delivery_d, ol_quantity, ol_amount, ol_dist_info) VALUES ({}, {}, {}, {}, {}, {}, '{}', {}, {}, '{}')", # o_id, d_id, w_id, ol_number, ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_dist_info + "createOrderLineMultivalue": "INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_delivery_d, ol_quantity, ol_amount, ol_dist_info) VALUES ", + "createOrderLineValues": "({}, {}, {}, {}, {}, {}, '{}', {}, {}, '{}')", }, "ORDER_STATUS": { "getCustomerByCustomerId": "SELECT c_id, c_first, c_middle, c_last, c_balance FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", # w_id, d_id, c_id @@ -119,7 +121,7 @@ def doDelivery(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ol_delivery_d = params["ol_delivery_d"] result: List[Tuple[Any, ...]] = [] - self._client.run_query_json("BEGIN") + self._client.run_query_ignore_results("BEGIN") for d_id in range(1, constants.DISTRICTS_PER_WAREHOUSE + 1): r, _ = self._client.run_query_json(q["getNewOrder"].format(d_id, w_id)) if len(r) == 0: @@ -137,17 +139,17 @@ def doDelivery(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ) ol_total = decimal.Decimal(r[0][0]) - self._client.run_query_json( + self._client.run_query_ignore_results( q["deleteNewOrder"].format(d_id, w_id, no_o_id) ) updateOrders = q["updateOrders"].format( o_carrier_id, no_o_id, d_id, w_id ) - self._client.run_query_json(updateOrders) + self._client.run_query_ignore_results(updateOrders) updateOrderLine = q["updateOrderLine"].format( ol_delivery_d.strftime("%Y-%m-%d %H:%M:%S"), no_o_id, d_id, w_id ) - self._client.run_query_json(updateOrderLine) + self._client.run_query_ignore_results(updateOrderLine) # These must be logged in the "result file" according to TPC-C 2.7.2.2 (page 39) # We remove the queued time, completed time, w_id, and o_carrier_id: the client can figure @@ -158,7 +160,7 @@ def doDelivery(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ), "ol_total is NULL: there are no order lines. This should not happen" assert ol_total > 0.0 - self._client.run_query_json( + self._client.run_query_ignore_results( q["updateCustomer"].format( ol_total.quantize(decimal.Decimal("1.00")), c_id, d_id, w_id ) @@ -166,7 +168,7 @@ def doDelivery(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: result.append((d_id, no_o_id)) - self._client.run_query_json("COMMIT") + self._client.run_query_ignore_results("COMMIT") return result except Exception as ex: @@ -192,7 +194,7 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: assert len(i_ids) == len(i_w_ids) assert len(i_ids) == len(i_qtys) - self._client.run_query_json("BEGIN") + self._client.run_query_ignore_results("BEGIN") all_local = True items = [] for i in range(len(i_ids)): @@ -206,7 +208,7 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ## Note that this will happen with 1% of transactions on purpose. for item in items: if len(item) == 0: - self._client.run_query_json("ROLLBACK") + self._client.run_query_ignore_results("ROLLBACK") return ## FOR @@ -233,7 +235,7 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ol_cnt = len(i_ids) o_carrier_id = constants.NULL_CARRIER_ID - self._client.run_query_json( + self._client.run_query_ignore_results( q["incrementNextOrderId"].format(d_next_o_id + 1, d_id, w_id) ) createOrder = q["createOrder"].format( @@ -246,8 +248,8 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ol_cnt, 1 if all_local else 0, ) - self._client.run_query_json(createOrder) - self._client.run_query_json( + self._client.run_query_ignore_results(createOrder) + self._client.run_query_ignore_results( q["createNewOrder"].format(d_next_o_id, d_id, w_id) ) @@ -256,6 +258,7 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ## ---------------- item_data = [] total = 0 + insert_value_strings = [] for i in range(len(i_ids)): ol_number = i + 1 ol_supply_w_id = i_w_ids[i] @@ -296,7 +299,7 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: if ol_supply_w_id != w_id: s_remote_cnt += 1 - self._client.run_query_json( + self._client.run_query_ignore_results( q["updateStock"].format( s_quantity, s_ytd.quantize(decimal.Decimal("1.00")), @@ -319,7 +322,7 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ol_amount = ol_quantity * i_price total += ol_amount - createOrderLine = q["createOrderLine"].format( + createOrderLineValues = q["createOrderLineValues"].format( d_next_o_id, d_id, w_id, @@ -331,7 +334,7 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ol_amount, s_dist_xx, ) - self._client.run_query_json(createOrderLine) + insert_value_strings.append(createOrderLineValues) ## Add the info to be returned item_data.append( @@ -339,8 +342,14 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: ) ## FOR + # Do one multivalue insert. + insertOrderLines = q["createOrderLineMultivalue"] + ", ".join( + insert_value_strings + ) + self._client.run_query_ignore_results(insertOrderLines) + ## Commit! - self._client.run_query_json("COMMIT") + self._client.run_query_ignore_results("COMMIT") ## Adjust the total for the discount # print "c_discount:", c_discount, type(c_discount) @@ -373,7 +382,7 @@ def doOrderStatus(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: c_id = params["c_id"] c_last = params["c_last"] - self._client.run_query_json("BEGIN") + self._client.run_query_ignore_results("BEGIN") if c_id != None: r, _ = self._client.run_query_json( q["getCustomerByCustomerId"].format(w_id, d_id, c_id) @@ -404,7 +413,7 @@ def doOrderStatus(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: else: orderLines = [] - self._client.run_query_json("COMMIT") + self._client.run_query_ignore_results("COMMIT") return [customer, order, orderLines] except Exception as ex: @@ -427,7 +436,7 @@ def doPayment(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: c_last = params["c_last"] h_date = params["h_date"] # Python datetime - self._client.run_query_json("BEGIN") + self._client.run_query_ignore_results("BEGIN") if c_id != None: r, _ = self._client.run_query_json( q["getCustomerByCustomerId"].format(w_id, d_id, c_id) @@ -456,10 +465,10 @@ def doPayment(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: r, _ = self._client.run_query_json(q["getDistrict"].format(w_id, d_id)) district = r[0] - self._client.run_query_json( + self._client.run_query_ignore_results( q["updateWarehouseBalance"].format(h_amount, w_id) ) - self._client.run_query_json( + self._client.run_query_ignore_results( q["updateDistrictBalance"].format(h_amount, w_id, d_id) ) @@ -480,10 +489,10 @@ def doPayment(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: c_d_id, c_id, ) - self._client.run_query_json(updateCustomer) + self._client.run_query_ignore_results(updateCustomer) else: c_data = "" - self._client.run_query_json( + self._client.run_query_ignore_results( q["updateGCCustomer"].format( c_balance, c_ytd_payment, c_payment_cnt, c_w_id, c_d_id, c_id ), @@ -502,9 +511,9 @@ def doPayment(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: h_amount.quantize(decimal.Decimal("1.00")), h_data, ) - self._client.run_query_json(insertHistory) + self._client.run_query_ignore_results(insertHistory) - self._client.run_query_json("COMMIT") + self._client.run_query_ignore_results("COMMIT") # TPC-C 2.5.3.3: Must display the following fields: # W_ID, D_ID, C_ID, C_D_ID, C_W_ID, W_STREET_1, W_STREET_2, W_CITY, W_STATE, W_ZIP, @@ -531,7 +540,7 @@ def doStockLevel(self, params: Dict[str, Any]) -> int: d_id = params["d_id"] threshold = params["threshold"] - self._client.run_query_json("BEGIN") + self._client.run_query_ignore_results("BEGIN") r, _ = self._client.run_query_json(q["getOId"].format(w_id, d_id)) result = r[0] assert result @@ -544,7 +553,7 @@ def doStockLevel(self, params: Dict[str, Any]) -> int: ) result = r[0] - self._client.run_query_json("COMMIT") + self._client.run_query_ignore_results("COMMIT") return int(result[0]) except Exception as ex: diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py b/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py index c25bce1c..f10f111f 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py @@ -37,6 +37,7 @@ import logging import os import pathlib +import numpy as np from datetime import datetime from pprint import pprint, pformat from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff @@ -62,6 +63,9 @@ def __init__(self, driver, scaleParameters, stop_on_error=False, pct_remote=0.1) self.total_workers = 1 self.worker_index = 0 + self.skew_alpha = None + self.skew_prng = None + ## DEF def execute( @@ -70,6 +74,7 @@ def execute( worker_index: int, total_workers: int, lat_sample_prob: float, + zipfian_alpha: Optional[float], ) -> results.Results: if RECORD_DETAILED_STATS_VAR in os.environ: import conductor.lib as cond @@ -115,6 +120,17 @@ def execute( *self.local_warehouse_range ) + if zipfian_alpha is not None: + self.skew_alpha = zipfian_alpha + self.skew_prng = np.random.default_rng(seed=42 ^ worker_index) + logging.info( + "Worker index %d - Selecting warehouse and items using a Zipfian distribution; a = %.2f", + worker_index, + self.skew_alpha, + ) + else: + logging.info("Worker index %d - Not using a Zipfian distribution") + r = results.Results(options) assert r logging.info("Executing benchmark for %d seconds" % duration) @@ -370,7 +386,19 @@ def makeWarehouseId(self): ): break else: - w_id = rand.number(*self.local_warehouse_range) + if self.skew_prng is not None: + # Skewed warehouse choice + min_warehouse, max_warehouse = self.local_warehouse_range + warehouse_span = max_warehouse - min_warehouse + 1 + while True: + # Chosen in range [1, inf) + candidate = self.skew_prng.zipf(a=self.skew_alpha) + if candidate <= warehouse_span: + break + return min_warehouse + (candidate - 1) + else: + # Uniformly randomly chosen warehouse + w_id = rand.number(*self.local_warehouse_range) assert w_id >= self.scaleParameters.starting_warehouse, ( "Invalid W_ID: %d" % w_id @@ -391,7 +419,14 @@ def makeCustomerId(self): ## DEF def makeItemId(self): - return rand.NURand(8191, 1, self.scaleParameters.items) + if self.skew_alpha is None: + return rand.NURand(8191, 1, self.scaleParameters.items) + else: + # Select item ID using a zipfian distribution. + while True: + candidate = self.skew_prng.zipf(a=self.skew_alpha) + if candidate <= self.scaleParameters.items: + return candidate ## DEF diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py b/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py index 027d4fb3..57ff0910 100755 --- a/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py @@ -218,7 +218,11 @@ def executorFunc( ) driver.executeStart() results = e.execute( - args["duration"], worker_index, total_workers, args["lat_sample_prob"] + args["duration"], + worker_index, + total_workers, + args["lat_sample_prob"], + args["zipfian_alpha"], ) driver.executeFinish() @@ -304,6 +308,11 @@ def executorFunc( default=0.1, help="The fraction of the transaction latencies to record.", ) + aparser.add_argument( + "--zipfian-alpha", + type=float, + help="The alpha parameter to use in a Zipfian distribution when selecting warehouse and item IDs.", + ) args = vars(aparser.parse_args()) if args["debug"]: @@ -386,6 +395,7 @@ def executorFunc( worker_index=0, total_workers=1, lat_sample_prob=args["lat_sample_prob"], + zipfian_alpha=args["zipfian_alpha"], ) driver.executeFinish() else: