Skip to content

Commit

Permalink
Fix inconsistencies between VCPU and R2 merge (#857)
Browse files Browse the repository at this point in the history
  • Loading branch information
sarahwooders authored Jun 6, 2023
1 parent 1cb5849 commit b265310
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 59 deletions.
5 changes: 0 additions & 5 deletions skyplane/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ class TransferConfig:
ibmcloud_instance_class: str = "bx2-2x8"
gcp_use_premium_network: bool = True

aws_vcpu_file: Path = aws_quota_path
gcp_vcpu_file: Path = gcp_quota_path
azure_vcpu_file: Path = azure_standardDv5_quota_path
# TODO: add ibmcloud when the quota info is available

# multipart config
multipart_enabled: bool = True
multipart_threshold_mb: int = 128
Expand Down
6 changes: 3 additions & 3 deletions skyplane/api/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ def __init__(
# planner
self.planning_algorithm = planning_algorithm
if self.planning_algorithm == "direct":
self.planner = MulticastDirectPlanner(self.max_instances, 64)
self.planner = MulticastDirectPlanner(self.max_instances, 64, self.transfer_config)
elif self.planning_algorithm == "src_one_sided":
self.planner = DirectPlannerSourceOneSided(self.max_instances, 64)
self.planner = DirectPlannerSourceOneSided(self.max_instances, 64, self.transfer_config)
elif self.planning_algorithm == "dst_one_sided":
self.planner = DirectPlannerDestOneSided(self.max_instances, 64)
self.planner = DirectPlannerDestOneSided(self.max_instances, 64, self.transfer_config)
else:
raise ValueError(f"No such planning algorithm {planning_algorithm}")

Expand Down
190 changes: 139 additions & 51 deletions skyplane/planner/planner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from importlib.resources import path
from typing import Dict, List, Optional, Tuple, Tuple
import re
import os

from skyplane import compute
from skyplane.api.config import TransferConfig
Expand All @@ -21,12 +22,30 @@
import json

from skyplane.utils.fn import do_parallel
from skyplane.config_paths import config_path, azure_standardDv5_quota_path, aws_quota_path, gcp_quota_path
from skyplane.config import SkyplaneConfig


class Planner:
# Only supporting "aws:m5.", "azure:StandardD_v5", and "gcp:n2-standard" instances for now
_VCPUS = (96, 64, 48, 32, 16, 8, 4, 2)

def __init__(self, transfer_config: TransferConfig):
self.transfer_config = transfer_config
self.config = SkyplaneConfig.load_config(config_path)

# Loading the quota information, add ibm cloud when it is supported
self.quota_limits = {}
if os.path.exists(aws_quota_path):
with aws_quota_path.open("r") as f:
self.quota_limits["aws"] = json.load(f)
if os.path.exists(azure_standardDv5_quota_path):
with azure_standardDv5_quota_path.open("r") as f:
self.quota_limits["azure"] = json.load(f)
if os.path.exists(gcp_quota_path):
with gcp_quota_path.open("r") as f:
self.quota_limits["gcp"] = json.load(f)

def plan(self) -> TopologyPlan:
raise NotImplementedError

Expand Down Expand Up @@ -74,10 +93,10 @@ def _vcpus_to_vm(cloud_provider: str, vcpus: int) -> str:

class UnicastDirectPlanner(Planner):
# DO NOT USE THIS - broken for single-region transfers
def __init__(self, n_instances: int, n_connections: int):
def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig):
self.n_instances = n_instances
self.n_connections = n_connections
super().__init__()
super().__init__(transfer_config)

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
# make sure only single destination
Expand Down Expand Up @@ -140,25 +159,10 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan:


class MulticastDirectPlanner(Planner):
n_instances: int
n_connections: int
transfer_config: TransferConfig

def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig):
self.n_instances = n_instances
self.n_connections = n_connections
self.transfer_config = transfer_config

# Loading the quota information, add ibm cloud when it is supported
self.quota_limits = {}
with self.transfer_config.aws_vcpu_file.open("r") as f:
self.quota_limits["aws"] = json.load(f)
with self.transfer_config.gcp_vcpu_file.open("r") as f:
self.quota_limits["gcp"] = json.load(f)
with self.transfer_config.azure_vcpu_file.open("r") as f:
self.quota_limits["azure"] = json.load(f)

super().__init__()
super().__init__(transfer_config)

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
src_region_tag = jobs[0].src_iface.region_tag()
Expand Down Expand Up @@ -329,14 +333,9 @@ def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[int, int]]:
return (vcpus, n_instances)


class DirectPlannerSourceOneSided(Planner):
class DirectPlannerSourceOneSided(MulticastDirectPlanner):
"""Planner that only creates VMs in the source region"""

def __init__(self, n_instances: int, n_connections: int):
self.n_instances = n_instances
self.n_connections = n_connections
super().__init__()

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
src_region_tag = jobs[0].src_iface.region_tag()
dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces]
Expand Down Expand Up @@ -391,13 +390,124 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
return plan


class DirectPlannerDestOneSided(Planner):
class DirectPlannerDestOneSided(MulticastDirectPlanner):
"""Planner that only creates instances in the destination region"""

def __init__(self, n_instances: int, n_connections: int):
self.n_instances = n_instances
self.n_connections = n_connections
super().__init__()
def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
# only create in destination region
src_region_tag = jobs[0].src_iface.region_tag()
dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces]
# jobs must have same sources and destinations
for job in jobs[1:]:
assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region"
assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set"

plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags)
# TODO: use VM limits to determine how many instances to create in each region
# TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions
for i in range(self.n_instances):
for dst_region_tag in dst_region_tags:
plan.add_gateway(dst_region_tag)

# initialize gateway programs per region
dst_program = {dst_region: GatewayProgram() for dst_region in dst_region_tags}

# iterate through all jobs
for job in jobs:
src_bucket = job.src_iface.bucket()
src_region_tag = job.src_iface.region_tag()
src_provider = src_region_tag.split(":")[0]

partition_id = jobs.index(job)

# send to all destination
dst_prefixes = job.dst_prefixes
for i in range(len(job.dst_ifaces)):
dst_iface = job.dst_ifaces[i]
dst_prefix = dst_prefixes[i]
dst_region_tag = dst_iface.region_tag()
dst_bucket = dst_iface.bucket()
dst_gateways = plan.get_region_gateways(dst_region_tag)

# source region gateway program
obj_store_read = dst_program[dst_region_tag].add_operator(
GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id
)

dst_program[dst_region_tag].add_operator(
GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections, key_prefix=dst_prefix),
parent_handle=obj_store_read,
partition_id=partition_id,
)

# update cost per GB
plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region_tag, dst_region_tag)

# set gateway programs
for dst_region_tag, program in dst_program.items():
plan.set_gateway_program(dst_region_tag, program)
return plan


class DirectPlannerSourceOneSided(MulticastDirectPlanner):
"""Planner that only creates VMs in the source region"""

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
src_region_tag = jobs[0].src_iface.region_tag()
dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces]
# jobs must have same sources and destinations
for job in jobs[1:]:
assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region"
assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set"

plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags)
# TODO: use VM limits to determine how many instances to create in each region
# TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions
for i in range(self.n_instances):
plan.add_gateway(src_region_tag)

# initialize gateway programs per region
src_program = GatewayProgram()

# iterate through all jobs
for job in jobs:
src_bucket = job.src_iface.bucket()
src_region_tag = job.src_iface.region_tag()
src_provider = src_region_tag.split(":")[0]

# give each job a different partition id, so we can read/write to different buckets
partition_id = jobs.index(job)

# source region gateway program
obj_store_read = src_program.add_operator(
GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id
)
# send to all destination
mux_and = src_program.add_operator(GatewayMuxAnd(), parent_handle=obj_store_read, partition_id=partition_id)
dst_prefixes = job.dst_prefixes
for i in range(len(job.dst_ifaces)):
dst_iface = job.dst_ifaces[i]
dst_prefix = dst_prefixes[i]
dst_region_tag = dst_iface.region_tag()
dst_bucket = dst_iface.bucket()
dst_gateways = plan.get_region_gateways(dst_region_tag)

# special case where destination is same region as source
src_program.add_operator(
GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections, key_prefix=dst_prefix),
parent_handle=mux_and,
partition_id=partition_id,
)
# update cost per GB
plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region_tag, dst_region_tag)

# set gateway programs
plan.set_gateway_program(src_region_tag, src_program)
return plan


class DirectPlannerDestOneSided(MulticastDirectPlanner):
"""Planner that only creates instances in the destination region"""

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
# only create in destination region
Expand Down Expand Up @@ -456,42 +566,20 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan:


class UnicastILPPlanner(Planner):
def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float):
self.n_instances = n_instances
self.n_connections = n_connections
self.solver_required_throughput_gbits = required_throughput_gbits
super().__init__()

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
raise NotImplementedError("ILP solver not implemented yet")


class MulticastILPPlanner(Planner):
def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float):
self.n_instances = n_instances
self.n_connections = n_connections
self.solver_required_throughput_gbits = required_throughput_gbits
super().__init__()

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
raise NotImplementedError("ILP solver not implemented yet")


class MulticastMDSTPlanner(Planner):
def __init__(self, n_instances: int, n_connections: int):
self.n_instances = n_instances
self.n_connections = n_connections
super().__init__()

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
raise NotImplementedError("MDST solver not implemented yet")


class MulticastSteinerTreePlanner(Planner):
def __init__(self, n_instances: int, n_connections: int):
self.n_instances = n_instances
self.n_connections = n_connections
super().__init__()

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
raise NotImplementedError("Steiner tree solver not implemented yet")

0 comments on commit b265310

Please sign in to comment.