diff --git a/skyplane/api/config.py b/skyplane/api/config.py index 4e4979815..d6a049d54 100644 --- a/skyplane/api/config.py +++ b/skyplane/api/config.py @@ -89,11 +89,6 @@ class TransferConfig: ibmcloud_instance_class: str = "bx2-2x8" gcp_use_premium_network: bool = True - aws_vcpu_file: Path = aws_quota_path - gcp_vcpu_file: Path = gcp_quota_path - azure_vcpu_file: Path = azure_standardDv5_quota_path - # TODO: add ibmcloud when the quota info is available - # multipart config multipart_enabled: bool = True multipart_threshold_mb: int = 128 diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index 5471ebf55..929a18832 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -68,11 +68,11 @@ def __init__( # planner self.planning_algorithm = planning_algorithm if self.planning_algorithm == "direct": - self.planner = MulticastDirectPlanner(self.max_instances, 64) + self.planner = MulticastDirectPlanner(self.max_instances, 64, self.transfer_config) elif self.planning_algorithm == "src_one_sided": - self.planner = DirectPlannerSourceOneSided(self.max_instances, 64) + self.planner = DirectPlannerSourceOneSided(self.max_instances, 64, self.transfer_config) elif self.planning_algorithm == "dst_one_sided": - self.planner = DirectPlannerDestOneSided(self.max_instances, 64) + self.planner = DirectPlannerDestOneSided(self.max_instances, 64, self.transfer_config) else: raise ValueError(f"No such planning algorithm {planning_algorithm}") diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 59eeac33e..6631de3ce 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -1,6 +1,7 @@ from importlib.resources import path from typing import Dict, List, Optional, Tuple, Tuple import re +import os from skyplane import compute from skyplane.api.config import TransferConfig @@ -21,12 +22,30 @@ import json from skyplane.utils.fn import do_parallel +from skyplane.config_paths import config_path, azure_standardDv5_quota_path, aws_quota_path, gcp_quota_path +from skyplane.config import SkyplaneConfig class Planner: # Only supporting "aws:m5.", "azure:StandardD_v5", and "gcp:n2-standard" instances for now _VCPUS = (96, 64, 48, 32, 16, 8, 4, 2) + def __init__(self, transfer_config: TransferConfig): + self.transfer_config = transfer_config + self.config = SkyplaneConfig.load_config(config_path) + + # Loading the quota information, add ibm cloud when it is supported + self.quota_limits = {} + if os.path.exists(aws_quota_path): + with aws_quota_path.open("r") as f: + self.quota_limits["aws"] = json.load(f) + if os.path.exists(azure_standardDv5_quota_path): + with azure_standardDv5_quota_path.open("r") as f: + self.quota_limits["azure"] = json.load(f) + if os.path.exists(gcp_quota_path): + with gcp_quota_path.open("r") as f: + self.quota_limits["gcp"] = json.load(f) + def plan(self) -> TopologyPlan: raise NotImplementedError @@ -74,10 +93,10 @@ def _vcpus_to_vm(cloud_provider: str, vcpus: int) -> str: class UnicastDirectPlanner(Planner): # DO NOT USE THIS - broken for single-region transfers - def __init__(self, n_instances: int, n_connections: int): + def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig): self.n_instances = n_instances self.n_connections = n_connections - super().__init__() + super().__init__(transfer_config) def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # make sure only single destination @@ -140,25 +159,10 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: class MulticastDirectPlanner(Planner): - n_instances: int - n_connections: int - transfer_config: TransferConfig - def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig): self.n_instances = n_instances self.n_connections = n_connections - self.transfer_config = transfer_config - - # Loading the quota information, add ibm cloud when it is supported - self.quota_limits = {} - with self.transfer_config.aws_vcpu_file.open("r") as f: - self.quota_limits["aws"] = json.load(f) - with self.transfer_config.gcp_vcpu_file.open("r") as f: - self.quota_limits["gcp"] = json.load(f) - with self.transfer_config.azure_vcpu_file.open("r") as f: - self.quota_limits["azure"] = json.load(f) - - super().__init__() + super().__init__(transfer_config) def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = jobs[0].src_iface.region_tag() @@ -329,14 +333,9 @@ def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[int, int]]: return (vcpus, n_instances) -class DirectPlannerSourceOneSided(Planner): +class DirectPlannerSourceOneSided(MulticastDirectPlanner): """Planner that only creates VMs in the source region""" - def __init__(self, n_instances: int, n_connections: int): - self.n_instances = n_instances - self.n_connections = n_connections - super().__init__() - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = jobs[0].src_iface.region_tag() dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] @@ -391,13 +390,124 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: return plan -class DirectPlannerDestOneSided(Planner): +class DirectPlannerDestOneSided(MulticastDirectPlanner): """Planner that only creates instances in the destination region""" - def __init__(self, n_instances: int, n_connections: int): - self.n_instances = n_instances - self.n_connections = n_connections - super().__init__() + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + # only create in destination region + src_region_tag = jobs[0].src_iface.region_tag() + dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] + # jobs must have same sources and destinations + for job in jobs[1:]: + assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" + assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" + + plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) + # TODO: use VM limits to determine how many instances to create in each region + # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions + for i in range(self.n_instances): + for dst_region_tag in dst_region_tags: + plan.add_gateway(dst_region_tag) + + # initialize gateway programs per region + dst_program = {dst_region: GatewayProgram() for dst_region in dst_region_tags} + + # iterate through all jobs + for job in jobs: + src_bucket = job.src_iface.bucket() + src_region_tag = job.src_iface.region_tag() + src_provider = src_region_tag.split(":")[0] + + partition_id = jobs.index(job) + + # send to all destination + dst_prefixes = job.dst_prefixes + for i in range(len(job.dst_ifaces)): + dst_iface = job.dst_ifaces[i] + dst_prefix = dst_prefixes[i] + dst_region_tag = dst_iface.region_tag() + dst_bucket = dst_iface.bucket() + dst_gateways = plan.get_region_gateways(dst_region_tag) + + # source region gateway program + obj_store_read = dst_program[dst_region_tag].add_operator( + GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id + ) + + dst_program[dst_region_tag].add_operator( + GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections, key_prefix=dst_prefix), + parent_handle=obj_store_read, + partition_id=partition_id, + ) + + # update cost per GB + plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region_tag, dst_region_tag) + + # set gateway programs + for dst_region_tag, program in dst_program.items(): + plan.set_gateway_program(dst_region_tag, program) + return plan + + +class DirectPlannerSourceOneSided(MulticastDirectPlanner): + """Planner that only creates VMs in the source region""" + + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + src_region_tag = jobs[0].src_iface.region_tag() + dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] + # jobs must have same sources and destinations + for job in jobs[1:]: + assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" + assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" + + plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) + # TODO: use VM limits to determine how many instances to create in each region + # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions + for i in range(self.n_instances): + plan.add_gateway(src_region_tag) + + # initialize gateway programs per region + src_program = GatewayProgram() + + # iterate through all jobs + for job in jobs: + src_bucket = job.src_iface.bucket() + src_region_tag = job.src_iface.region_tag() + src_provider = src_region_tag.split(":")[0] + + # give each job a different partition id, so we can read/write to different buckets + partition_id = jobs.index(job) + + # source region gateway program + obj_store_read = src_program.add_operator( + GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id + ) + # send to all destination + mux_and = src_program.add_operator(GatewayMuxAnd(), parent_handle=obj_store_read, partition_id=partition_id) + dst_prefixes = job.dst_prefixes + for i in range(len(job.dst_ifaces)): + dst_iface = job.dst_ifaces[i] + dst_prefix = dst_prefixes[i] + dst_region_tag = dst_iface.region_tag() + dst_bucket = dst_iface.bucket() + dst_gateways = plan.get_region_gateways(dst_region_tag) + + # special case where destination is same region as source + src_program.add_operator( + GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections, key_prefix=dst_prefix), + parent_handle=mux_and, + partition_id=partition_id, + ) + # update cost per GB + plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region_tag, dst_region_tag) + + # set gateway programs + plan.set_gateway_program(src_region_tag, src_program) + return plan + + +class DirectPlannerDestOneSided(MulticastDirectPlanner): + """Planner that only creates instances in the destination region""" def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # only create in destination region @@ -456,42 +566,20 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: class UnicastILPPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float): - self.n_instances = n_instances - self.n_connections = n_connections - self.solver_required_throughput_gbits = required_throughput_gbits - super().__init__() - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: raise NotImplementedError("ILP solver not implemented yet") class MulticastILPPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float): - self.n_instances = n_instances - self.n_connections = n_connections - self.solver_required_throughput_gbits = required_throughput_gbits - super().__init__() - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: raise NotImplementedError("ILP solver not implemented yet") class MulticastMDSTPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int): - self.n_instances = n_instances - self.n_connections = n_connections - super().__init__() - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: raise NotImplementedError("MDST solver not implemented yet") class MulticastSteinerTreePlanner(Planner): - def __init__(self, n_instances: int, n_connections: int): - self.n_instances = n_instances - self.n_connections = n_connections - super().__init__() - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: raise NotImplementedError("Steiner tree solver not implemented yet")