Skip to content

Commit

Permalink
Merge branch 'main' into deprovisioning
Browse files Browse the repository at this point in the history
  • Loading branch information
abiswal2001 authored Jul 21, 2023
2 parents b8bdf02 + 73574e5 commit 4dc7752
Show file tree
Hide file tree
Showing 23 changed files with 333 additions and 88 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/integration-test-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ jobs:
poetry run skyplane init -y --disable-config-azure --disable-config-cloudflare
poetry run skyplane config set usage_stats false
- name: Deprovision
run: poetry run skyplane deprovision
run: poetry run skyplane deprovision --all
- name: Delete matching S3 buckets
run: |
for pattern in "test-skyplane-" "skyplane-integration-us-east-1-" "integrationus-east-1-"; do
for pattern in "test-skyplane-" "skyplane-integration-" "integrationus-east-1-"; do
aws s3api list-buckets --query "Buckets[?starts_with(Name, \`${pattern}\`) == \`true\`].Name" --output text | tr '\t' '\n' | while read bucket; do aws s3 rb "s3://$bucket" --force; done
done
- name: Cleanup GCP service account
Expand Down
14 changes: 7 additions & 7 deletions .github/workflows/integration-test-multiple-sizes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ jobs:
- gcp:us-central1-a gcp:us-central1-a --multipart
- gcp:us-west1-a gcp:us-east1-a --multipart
# Azure to Azure
- azure:westus azure:westus
- azure:eastus azure:westus
- azure:westus azure:westus --multipart
- azure:eastus azure:westus --multipart
# cross cloud tests
- aws:us-west-1 gcp:us-west2-a --multipart
- gcp:us-west2-a aws:us-west-1 --multipart
- aws:us-west-1 azure:westus
- azure:westus aws:us-west-1
- gcp:us-west2-a azure:westus
- azure:westus gcp:us-west2-a
- aws:us-west-1 azure:westus --multipart
- azure:westus aws:us-west-1 --multipart
- gcp:us-west2-a azure:westus --multipart
- azure:westus gcp:us-west2-a --multipart
timeout-minutes: 40
env:
STRATEGY_UUID: itest-${{ github.run_id }}-${{ github.run_attempt }}-${{ strategy.job-index }}
Expand Down Expand Up @@ -104,7 +104,7 @@ jobs:
run: poetry run skyplane deprovision
- name: Delete matching S3 buckets
run: |
for pattern in "test-skyplane-" "skyplane-integration-us-east-1-" "integrationus-east-1-"; do
for pattern in "test-skyplane-" "skyplane-integration-" "integrationus-east-1-"; do
aws s3api list-buckets --query "Buckets[?starts_with(Name, \`${pattern}\`) == \`true\`].Name" --output text | tr '\t' '\n' | while read bucket; do aws s3 rb "s3://$bucket" --force; done
done
- name: Cleanup GCP service account
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ jobs:
poetry run skyplane init -y --disable-config-azure
poetry run skyplane config set usage_stats false
- name: Deprovision
run: poetry run skyplane deprovision
run: poetry run skyplane deprovision --all
- name: Delete matching S3 buckets
run: |
for pattern in "test-skyplane-" "skyplane-integration-us-east-1-" "integrationus-east-1-"; do
for pattern in "test-skyplane-" "skyplane-integration-" "integrationus-east-1-"; do
aws s3api list-buckets --query "Buckets[?starts_with(Name, \`${pattern}\`) == \`true\`].Name" --output text | tr '\t' '\n' | while read bucket; do aws s3 rb "s3://$bucket" --force; done
done
- name: Cleanup GCP service account
Expand Down
95 changes: 95 additions & 0 deletions .github/workflows/pytest_integration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
name: pytest-integration
on:
workflow_dispatch:
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
SKYPLANE_USAGE_STATS_ENABLED: 0
jobs:
pytest-integration:
if: ${{ always() }}
runs-on: ubuntu-latest
strategy:
max-parallel: 8
timeout-minutes: 40
env:
STRATEGY_UUID: itest-${{ github.run_id }}-${{ github.run_attempt }}-${{ strategy.job-index }}
steps:
- uses: actions/checkout@v1
- name: Install poetry
run: pipx install poetry
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "poetry"
- name: Set Poetry config
run: |
poetry config virtualenvs.in-project false
poetry config virtualenvs.path ~/.virtualenvs
- name: Install Dependencies
run: |
poetry install -E gateway -E solver -E aws -E azure -E gcp -E ibm
poetry run pip install -r requirements-dev.txt
if: steps.cache.outputs.cache-hit != 'true'
- id: 'auth'
uses: 'google-github-actions/auth@v1'
with:
credentials_json: '${{ secrets.GCP_CREDENTIALS_JSON }}'
- name: Log into Azure
uses: azure/login@v1
with:
creds: '{"clientId":"${{ secrets.AZURE_CLIENT_ID }}","clientSecret":"${{ secrets.AZURE_CLIENT_SECRET }}","subscriptionId":"${{ secrets.AZURE_SUBSCRIPTION_ID }}","tenantId":"${{ secrets.AZURE_TENANT_ID }}"}'
- name: Skyplane init
run: |
poetry run skyplane config set gcp_service_account_name ${{ env.STRATEGY_UUID }}
poetry run skyplane config set native_cmd_enabled false
cat ~/.skyplane/config
poetry run skyplane init -y --disable-config-cloudflare
poetry run skyplane config set usage_stats false
- name: Run integration tests
run: poetry run pytest -s tests/integration/test_cp.py
- name: Cleanup GCP service account
if: always()
run: gcloud iam service-accounts delete ${{ env.STRATEGY_UUID }}@${{ secrets.GCP_PROJECT_ID }}.iam.gserviceaccount.com
deprovision:
runs-on: ubuntu-latest
if: ${{ always() }}
needs: [pytest-integration]
env:
STRATEGY_UUID: itest-d-${{ github.run_id }}-${{ github.run_attempt }}
steps:
- uses: actions/checkout@v1
- name: Install poetry
run: pipx install poetry
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "poetry"
- name: Set Poetry config
run: |
poetry config virtualenvs.in-project false
poetry config virtualenvs.path ~/.virtualenvs
- name: Install dependencies
run: poetry install -E aws -E azure -E gcp
if: steps.cache.outputs.cache-hit != 'true'
- id: 'auth'
uses: 'google-github-actions/auth@v1'
with:
credentials_json: '${{ secrets.GCP_CREDENTIALS_JSON }}'
- name: Skyplane init
run: |
poetry run skyplane config set gcp_service_account_name ${{ env.STRATEGY_UUID }}
poetry run skyplane init -y --disable-config-azure --disable-config-cloudflare
poetry run skyplane config set usage_stats false
- name: Deprovision
run: poetry run skyplane deprovision --all
- name: Delete matching S3 buckets
run: |
for pattern in "test-skyplane-" "skyplane-integration-" "integrationus-east-1-"; do
aws s3api list-buckets --query "Buckets[?starts_with(Name, \`${pattern}\`) == \`true\`].Name" --output text | tr '\t' '\n' | while read bucket; do aws s3 rb "s3://$bucket" --force; done
done
- name: Cleanup GCP service account
if: always()
run: gcloud iam service-accounts delete --quiet ${{ env.STRATEGY_UUID }}@${{ secrets.GCP_PROJECT_ID }}.iam.gserviceaccount.com
7 changes: 6 additions & 1 deletion docs/debugging.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ skyplane ssh
```
which will list available gateways that you can select to ssh into.

Once you've ssh-ed into a gateway instance, you can interact with the Skyplane docker image (??).
Once you've ssh-ed into a gateway instance, you can interact with the Skyplane docker image (??).

## Running transfers locally
Skyplane supports testing the gateway containers locally without cloud VM instances. You can run Skyplane in local mode using the `--local` flag. This will run docker containers on your current machine.

(TODO: finish writing)
4 changes: 2 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ Contents
quickstart
configure
architecture
performance_stats_collection
on-prem_setup
permissions
faq

.. toctree::
Expand Down Expand Up @@ -56,6 +55,7 @@ Contents

benchmark
performance_stats_collection
on-prem_setup
roadmap

.. toctree::
Expand Down
26 changes: 26 additions & 0 deletions docs/permissions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Required Permissions
Skyplane runs in your own cloud accounts and does interact with an external service to run transfers. However the cloud account you are running Skyplane in must have permissions to interact with object storage and create VMs to execute transfers

## AWS
Your AWS account must have the following permissions:
* `Storage`
* `Multipart`
* `VM`


## GCP
Your GCP account must have the following permissions:
* `CreateServiceAccount`
* `ObjectStore`
* `VM`

For GCP, Skyplane create a service account which has permissions to read and write from GCP object stores.

TODO: check to see if removing obj store permissions from GCP service account still lets service credentials read.

## Azure
Your Azure account must have the following roles:
* `Storage Blob Data Contributor`
* `Storage Account Contributor`

NOTE: Within Azure, it is not sufficient to have just the `Owner` role to be able to access and write to containers in storage. The VMs that Skyplane provisions are assigned the sufficient storage permissions, but to be able to interact with Azure storage locally, check to make sure your personal Azure account has the roles listed above.
24 changes: 24 additions & 0 deletions docs/programmable_gateway.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Customizing the Overlay Network
Skyplane can be modified to support customizeable overlay networks.

## Gateway Program
The gateway program defines how data is processed by a single gateway (cloud VM). The gateway program defines what object stores to read and write from and what other gateways to send and recieve data between.

Currently, gateway programs can compose the following operators:

For example, we can write a gateway program for a single VM that reads from bucket `src_bucket` and writes data to both `dst_bucket_1` and `dst_bucket_2` with the following code:

```
program = GatewayProgram()
# read from object store
read_op = program.add_operator(ObjectStoreRead(bucket="src_bucket"))
# send read data to both child operators (write1_op, write2_op)
and_op = program.add_operator(MuxAnd(parent=read_op))
# write to object stores
write1_op = program.add_operator(ObjectStoreWrite(bucket="dst_bucket_1"), parent=and_op)
write2_op = program.add_operator(ObjectStoreRead(bucket="dst_bucket_2"), parent=and_op)
print(program.to_json()) # JSON representation of program
```

## Planners
Skyplane's planner determines the structure of the overlay network and how data should be transferred through the overlay network.
10 changes: 9 additions & 1 deletion docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,17 @@ skyplane sync s3://... gs://...
You can also run skyplane from a Python API client. To copy a single object or folder, you can run:
```
import skyplane
client = skyplane.SkyplaneClient()
# single destination
client.copy(src="s3://bucket-src/key", dst="s3://bucket-dst/key", recursive=False)
# multiple destinations
client.copy(
src="s3://bucket-src/key",
dst=["s3://bucket-dst1/key", "s3://bucket-dst2/key"],
recursive=False
)
```
This will create a skyplane dataplane (i.e. cluster), execute the transfer, and tear down the cluster upon completion.

Expand Down
3 changes: 2 additions & 1 deletion docs/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Skyplane currently supports the following source and destination endpoints (any
| AWS S3 |||
| Google Storage |||
| Azure Blob Storage |||
| Cloudflare R2 |||
| Local Disk || (in progress) |

Skyplane is an actively developed project. It will have 🔪 SHARP EDGES 🔪. Please file an issue or ask the contributors via [the #help channel on our Slack](https://join.slack.com/t/skyplaneworkspace/shared_invite/zt-1cxmedcuc-GwIXLGyHTyOYELq7KoOl6Q) if you encounter bugs.
Skyplane is an actively developed project. It will have 🔪 SHARP EDGES 🔪. Please file an issue or ask the contributors via [the #help channel on our Slack](https://join.slack.com/t/skyplaneworkspace/shared_invite/zt-1cxmedcuc-GwIXLGyHTyOYELq7KoOl6Q) if you encounter bugs.
2 changes: 1 addition & 1 deletion skyplane/api/dataplane.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def _start_gateway(
gateway_docker_image=gateway_docker_image,
gateway_program_path=str(gateway_program_filename),
gateway_info_path=f"{gateway_log_dir}/gateway_info.json",
e2ee_key_bytes=None, # TODO: remove
e2ee_key_bytes=e2ee_key_bytes, # TODO: remove
use_bbr=self.transfer_config.use_bbr, # TODO: remove
use_compression=self.transfer_config.use_compression,
use_socket_tls=self.transfer_config.use_socket_tls,
Expand Down
20 changes: 17 additions & 3 deletions skyplane/api/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(
transfer_config: TransferConfig,
# cloud_regions: dict,
max_instances: Optional[int] = 1,
n_connections: Optional[int] = 64,
planning_algorithm: Optional[str] = "direct",
debug: Optional[bool] = False,
):
Expand All @@ -54,6 +55,7 @@ def __init__(
# self.cloud_regions = cloud_regions
# TODO: set max instances with VM CPU limits and/or config
self.max_instances = max_instances
self.n_connections = n_connections
self.provisioner = provisioner
self.transfer_config = transfer_config
self.http_pool = urllib3.PoolManager(retries=urllib3.Retry(total=3))
Expand All @@ -68,11 +70,11 @@ def __init__(
# planner
self.planning_algorithm = planning_algorithm
if self.planning_algorithm == "direct":
self.planner = MulticastDirectPlanner(self.max_instances, 64, self.transfer_config)
self.planner = MulticastDirectPlanner(self.max_instances, self.n_connections, self.transfer_config)
elif self.planning_algorithm == "src_one_sided":
self.planner = DirectPlannerSourceOneSided(self.max_instances, 64, self.transfer_config)
self.planner = DirectPlannerSourceOneSided(self.max_instances, self.n_connections, self.transfer_config)
elif self.planning_algorithm == "dst_one_sided":
self.planner = DirectPlannerDestOneSided(self.max_instances, 64, self.transfer_config)
self.planner = DirectPlannerDestOneSided(self.max_instances, self.n_connections, self.transfer_config)
else:
raise ValueError(f"No such planning algorithm {planning_algorithm}")

Expand Down Expand Up @@ -121,6 +123,18 @@ def start(self, debug=False, progress=False):
dp.deprovision(spinner=True)
return dp

def start_async(self, debug=False):
dp = self.create_dataplane(debug)
try:
dp.provision(spinner=False)
tracker = dp.run_async(self.jobs_to_dispatch)
if debug:
dp.copy_gateway_logs()
return tracker
except Exception as e:
dp.copy_gateway_logs()
return

def queue_copy(
self,
src: str,
Expand Down
38 changes: 23 additions & 15 deletions skyplane/api/transfer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@
from skyplane import exceptions
from skyplane.api.config import TransferConfig
from skyplane.chunk import Chunk, ChunkRequest
from skyplane.obj_store.azure_blob_interface import AzureBlobInterface, AzureBlobObject
from skyplane.obj_store.gcs_interface import GCSObject
from skyplane.obj_store.r2_interface import R2Object
from skyplane.obj_store.storage_interface import StorageInterface
from skyplane.obj_store.object_store_interface import ObjectStoreObject, ObjectStoreInterface
from skyplane.obj_store.s3_interface import S3Object
from skyplane.utils import logger
from skyplane.utils.definitions import MB
from skyplane.utils.fn import do_parallel
Expand Down Expand Up @@ -184,7 +180,9 @@ def _run_multipart_chunk_thread(

metadata = None
# Convert parts to base64 and store mime_type if destination interface is AzureBlobInterface
if isinstance(dest_iface, AzureBlobInterface):
if dest_iface.provider == "azure":
from skyplane.obj_store.azure_blob_interface import AzureBlobInterface

block_ids = list(map(lambda part_num: AzureBlobInterface.id_to_base64_encoding(part_num, dest_object.key), parts))
metadata = (block_ids, mime_type)

Expand Down Expand Up @@ -317,8 +315,25 @@ def transfer_pair_generator(
except exceptions.MissingObjectException as e:
logger.fs.exception(e)
raise e from None

if dest_provider == "aws":
from skyplane.obj_store.s3_interface import S3Object

dest_obj = S3Object(provider=dest_provider, bucket=dst_iface.bucket(), key=dest_key)
elif dest_provider == "azure":
from skyplane.obj_store.azure_blob_interface import AzureBlobObject

dest_obj = AzureBlobObject(provider=dest_provider, bucket=dst_iface.bucket(), key=dest_key)
elif dest_provider == "gcp":
from skyplane.obj_store.gcs_interface import GCSObject

dest_obj = dst_iface.create_object_repr(dest_key)
dest_obj = GCSObject(provider=dest_provider, bucket=dst_iface.bucket(), key=dest_key)
elif dest_provider == "cloudflare":
from skyplane.obj_store.r2_interface import R2Object

dest_obj = R2Object(provider=dest_provider, bucket=dst_iface.bucket(), key=dest_key)
else:
raise ValueError(f"Invalid dest_region {dest_region}, unknown provider")
dest_objs[dst_iface.region_tag()] = dest_obj

# assert that all destinations share the same post-fix key
Expand All @@ -341,11 +356,8 @@ def chunk(self, transfer_pair_generator: Generator[TransferPair, None, None]) ->
multipart_exit_event = threading.Event()
multipart_chunk_threads = []

# TODO: remove after azure multipart implemented
azure_dest = any([dst_iface.provider == "azure" for dst_iface in self.dst_ifaces])

# start chunking threads
if not azure_dest and self.transfer_config.multipart_enabled:
if self.transfer_config.multipart_enabled:
for _ in range(self.concurrent_multipart_chunk_threads):
t = threading.Thread(
target=self._run_multipart_chunk_thread,
Expand All @@ -359,11 +371,7 @@ def chunk(self, transfer_pair_generator: Generator[TransferPair, None, None]) ->
for transfer_pair in transfer_pair_generator:
# print("transfer_pair", transfer_pair.src_obj.key, transfer_pair.dst_objs)
src_obj = transfer_pair.src_obj
if (
not azure_dest
and self.transfer_config.multipart_enabled
and src_obj.size > self.transfer_config.multipart_threshold_mb * MB
):
if self.transfer_config.multipart_enabled and src_obj.size > self.transfer_config.multipart_threshold_mb * MB:
multipart_send_queue.put(transfer_pair)
else:
if transfer_pair.src_obj.size == 0:
Expand Down
Loading

0 comments on commit 4dc7752

Please sign in to comment.