Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions images/container-client-test/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,60 @@ const server = createServer(function (req, res) {
return;
}

// Write a file inside the container (for snapshot testing)
if (req.url.startsWith('/write-file')) {
const url = new URL(req.url, 'http://localhost');
const filePath = url.searchParams.get('path');
if (!filePath) {
res.writeHead(400, { 'Content-Type': 'text/plain' });
res.write('Missing "path" query param');
res.end();
return;
}
const fs = require('fs');
const path = require('path');
let body = '';
req.on('data', (chunk) => (body += chunk));
req.on('end', () => {
try {
fs.mkdirSync(path.dirname(filePath), { recursive: true });
fs.writeFileSync(filePath, body);
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.write('ok');
res.end();
} catch (err) {
res.writeHead(500, { 'Content-Type': 'text/plain' });
res.write(err.message);
res.end();
}
});
return;
}

// Read a file inside the container (for snapshot testing)
if (req.url.startsWith('/read-file')) {
const url = new URL(req.url, 'http://localhost');
const filePath = url.searchParams.get('path');
if (!filePath) {
res.writeHead(400, { 'Content-Type': 'text/plain' });
res.write('Missing "path" query param');
res.end();
return;
}
const fs = require('fs');
try {
const content = fs.readFileSync(filePath, 'utf8');
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.write(content);
res.end();
} catch (err) {
res.writeHead(404, { 'Content-Type': 'text/plain' });
res.write(err.message);
res.end();
}
return;
}

if (req.url === '/intercept') {
const targetHost = req.headers['x-host'] || '11.0.0.1';
fetch(`http://${targetHost}`)
Expand Down
63 changes: 63 additions & 0 deletions src/workerd/api/container.c++
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <workerd/io/features.h>
#include <workerd/io/io-context.h>

#include <cmath>

namespace workerd::api {

// =======================================================================================
Expand Down Expand Up @@ -74,13 +76,74 @@ void Container::start(jsg::Lock& js, jsg::Optional<StartupOptions> maybeOptions)
}
}

if (!flags.getWorkerdExperimental()) {
JSG_REQUIRE(options.snapshots == kj::none, Error,
"Container snapshot restore requires the 'experimental' compatibility flag.");
} else {
KJ_IF_SOME(snapshots, options.snapshots) {
auto list = req.initSnapshots(snapshots.size());
for (auto i: kj::indices(snapshots)) {
auto entry = list[i];
auto& restore = snapshots[i];
auto& snap = restore.snapshot;
double size = snap.size;
JSG_REQUIRE(std::isfinite(size) && size >= 0 &&
size <= static_cast<double>(jsg::MAX_SAFE_INTEGER) && std::floor(size) == size,
RangeError, "Snapshot size must be a non-negative integer <= Number.MAX_SAFE_INTEGER");
auto snapshotBuilder = entry.initSnapshot();
snapshotBuilder.setId(snap.id);
snapshotBuilder.setSize(static_cast<uint64_t>(size));
snapshotBuilder.setDir(snap.dir);
KJ_IF_SOME(name, snap.name) {
snapshotBuilder.setName(name);
}
KJ_IF_SOME(mp, restore.mountPoint) {
entry.setMountPoint(mp);
}
}
}
}

req.setCompatibilityFlags(flags);

IoContext::current().addTask(req.sendIgnoringResult());

running = true;
}

jsg::Promise<Container::DirectorySnapshot> Container::snapshotDirectory(
jsg::Lock& js, DirectorySnapshotOptions options) {
JSG_REQUIRE(
running, Error, "snapshotDirectory() cannot be called on a container that is not running.");
JSG_REQUIRE(options.dir.size() > 0 && options.dir.startsWith("/"), TypeError,
"snapshotDirectory() requires an absolute directory path (starting with '/').");

auto req = rpcClient->snapshotDirectoryRequest();
req.setDir(options.dir);

KJ_IF_SOME(name, options.name) {
req.setName(name);
}

return IoContext::current()
.awaitIo(js, req.send())
.then(
js, [](jsg::Lock& js, capnp::Response<rpc::Container::SnapshotDirectoryResults> results) {
auto snapshot = results.getSnapshot();
jsg::Optional<kj::String> name = kj::none;
auto snapshotName = snapshot.getName();
if (snapshotName.size() > 0) {
name = kj::str(snapshotName);
}

JSG_REQUIRE(snapshot.getSize() <= jsg::MAX_SAFE_INTEGER, RangeError,
"Snapshot size exceeds Number.MAX_SAFE_INTEGER");

return Container::DirectorySnapshot{kj::str(snapshot.getId()),
static_cast<double>(snapshot.getSize()), kj::str(snapshot.getDir()), kj::mv(name)};
});
}

jsg::Promise<void> Container::setInactivityTimeout(jsg::Lock& js, int64_t durationMs) {
JSG_REQUIRE(
durationMs > 0, TypeError, "setInactivityTimeout() cannot be called with a durationMs <= 0");
Expand Down
40 changes: 34 additions & 6 deletions src/workerd/api/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
// https://opensource.org/licenses/Apache-2.0

#pragma once
// APIs that an Actor (Durable Object) uses to access its own state.
//
// See actor.h for APIs used by other Workers to talk to Actors.
// Container management API for Durable Object-attached containers.
//
#include <workerd/io/compatibility-date.h>
#include <workerd/io/container.capnp.h>
Expand All @@ -24,16 +22,40 @@ class Container: public jsg::Object {
public:
Container(rpc::Container::Client rpcClient, bool running);

struct DirectorySnapshot {
kj::String id;
double size;
kj::String dir;
jsg::Optional<kj::String> name;

JSG_STRUCT(id, size, dir, name);
};

struct DirectorySnapshotOptions {
kj::String dir;
jsg::Optional<kj::String> name;

JSG_STRUCT(dir, name);
};

struct SnapshotRestoreParams {
DirectorySnapshot snapshot;
jsg::Optional<kj::String> mountPoint;

JSG_STRUCT(snapshot, mountPoint);
};

struct StartupOptions {
jsg::Optional<kj::Array<kj::String>> entrypoint;
bool enableInternet = false;
jsg::Optional<jsg::Dict<kj::String>> env;
jsg::Optional<int64_t> hardTimeout;
jsg::Optional<jsg::Dict<kj::String>> labels;
jsg::Optional<kj::Array<SnapshotRestoreParams>> snapshots;

// TODO(containers): Allow intercepting stdin/stdout/stderr by specifying streams here.

JSG_STRUCT(entrypoint, enableInternet, env, hardTimeout, labels);
JSG_STRUCT(entrypoint, enableInternet, env, hardTimeout, labels, snapshots);
JSG_STRUCT_TS_OVERRIDE_DYNAMIC(CompatibilityFlags::Reader flags) {
if (flags.getWorkerdExperimental()) {
JSG_TS_OVERRIDE(ContainerStartupOptions {
Expand All @@ -42,6 +64,7 @@ class Container: public jsg::Object {
env?: Record<string, string>;
hardTimeout?: number | bigint;
labels?: Record<string, string>;
snapshots?: ContainerSnapshotRestoreParams[];
});
} else {
JSG_TS_OVERRIDE(ContainerStartupOptions {
Expand All @@ -54,7 +77,7 @@ class Container: public jsg::Object {
}
};

bool getRunning() {
bool getRunning() const {
return running;
}

Expand All @@ -70,6 +93,8 @@ class Container: public jsg::Object {
jsg::Promise<void> interceptAllOutboundHttp(jsg::Lock& js, jsg::Ref<Fetcher> binding);
jsg::Promise<void> interceptOutboundHttps(
jsg::Lock& js, kj::String addr, jsg::Ref<Fetcher> binding);
jsg::Promise<DirectorySnapshot> snapshotDirectory(
jsg::Lock& js, DirectorySnapshotOptions options);

// TODO(containers): listenTcp()

Expand All @@ -86,6 +111,7 @@ class Container: public jsg::Object {
JSG_METHOD(interceptAllOutboundHttp);
if (flags.getWorkerdExperimental()) {
JSG_METHOD(interceptOutboundHttps);
JSG_METHOD(snapshotDirectory);
}
}

Expand All @@ -107,6 +133,8 @@ class Container: public jsg::Object {
class TcpPortOutgoingFactory;
};

#define EW_CONTAINER_ISOLATE_TYPES api::Container, api::Container::StartupOptions
#define EW_CONTAINER_ISOLATE_TYPES \
api::Container, api::Container::DirectorySnapshot, api::Container::DirectorySnapshotOptions, \
api::Container::SnapshotRestoreParams, api::Container::StartupOptions

} // namespace workerd::api
39 changes: 39 additions & 0 deletions src/workerd/io/container.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,49 @@ interface Container @0x9aaceefc06523bca {

labels @5 :List(Label);
# Optional key-value metadata labels for metrics/observability.

snapshots @6 :List(SnapshotRestoreParams);
# Directory snapshots to restore before the container starts.
}

struct Label {
name @0 :Text;
value @1 :Text;
}

struct SnapshotRestoreParams {
snapshot @0 :DirectorySnapshot;
# The snapshot to restore.

mountPoint @1 :Text;
# Where to mount the snapshot in the container filesystem.
# If empty, the snapshot is restored to its original directory.
}

struct DirectorySnapshot {
# Opaque handle to a directory snapshot.

id @0 :Text;
# Unique identifier of the snapshot.

size @1 :UInt64;
# Snapshot size, in bytes.

dir @2 :Text;
# Path of the snapshotted directory.

name @3 :Text;
# Optional human-friendly name. Empty string means not set.
}

struct SnapshotDirectoryParams {
dir @0 :Text;
# Directory path to snapshot.

name @1 :Text;
# Optional human-friendly name. Empty string means not set.
}

monitor @2 () -> (exitCode: Int32);
# Waits for the container to shut down.
#
Expand Down Expand Up @@ -142,4 +178,7 @@ interface Container @0x9aaceefc06523bca {


# TODO: setEgressTcp

snapshotDirectory @10 SnapshotDirectoryParams -> (snapshot :DirectorySnapshot);
# Creates a snapshot for a directory in the running container.
}
1 change: 0 additions & 1 deletion src/workerd/jsg/jsvalue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ bool JsNumber::isSafeInteger(Lock& js) const {
if (!inner->IsNumber()) return false;
KJ_IF_SOME(value, value(js)) {
if (std::isnan(value) || std::isinf(value) || std::trunc(value) != value) return false;
constexpr uint64_t MAX_SAFE_INTEGER = (1ull << 53) - 1;
if (std::abs(value) <= static_cast<double>(MAX_SAFE_INTEGER)) return true;
}
return false;
Expand Down
2 changes: 2 additions & 0 deletions src/workerd/jsg/jsvalue.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

namespace workerd::jsg {

constexpr uint64_t MAX_SAFE_INTEGER = (1ull << 53) - 1;

inline void requireOnStack(void* self) {
#ifdef KJ_DEBUG
kj::requireOnStack(self, "JsValue types must be allocated on stack");
Expand Down
1 change: 1 addition & 0 deletions src/workerd/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ wd_cc_library(
"//src/workerd/io:container_capnp",
"//src/workerd/jsg",
"//src/workerd/util:strings",
"//src/workerd/util:uuid",
"@ada-url",
"@capnp-cpp//src/capnp/compat:http-over-capnp",
"@capnp-cpp//src/kj",
Expand Down
Loading
Loading