-
Notifications
You must be signed in to change notification settings - Fork 595
Add Socket::proxyTo() API #6299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| using Workerd = import "/workerd/workerd.capnp"; | ||
|
|
||
| const tcpIngressExample :Workerd.Config = ( | ||
| services = [ | ||
| (name = "main", worker = .worker), | ||
| ], | ||
|
|
||
| sockets = [ | ||
| ( name = "http", address = "*:8080", http = (), service = "main" ), | ||
| ( name = "tcp", address = "*:8081", tcp = (), service = "main" ) | ||
| ] | ||
| ); | ||
|
|
||
| const worker :Workerd.Worker = ( | ||
| modules = [ | ||
| (name = "worker", esModule = embed "worker.js") | ||
| ], | ||
| compatibilityFlags = ["nodejs_compat_v2", "experimental"], | ||
| compatibilityDate = "2023-02-28", | ||
| ); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
|
|
||
| export default { | ||
| async fetch(req) { | ||
| return new Response("ok"); | ||
| }, | ||
|
|
||
| async connect(socket) { | ||
| // pipe the input stream to the output | ||
| await socket.readable.pipeTo(socket.writable); | ||
| } | ||
| }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -126,6 +126,11 @@ class Socket: public jsg::Object { | |
| // closing. | ||
| jsg::Promise<void> close(jsg::Lock& js); | ||
|
|
||
| // Proxies to the other socket. Equivalent to: | ||
| // a.readable.pipeTo(b.writable); b.readable.pipeTo(a.writable); | ||
| // TODO: May want to add jsg::Optional<PipeToOptions> options? | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: The doc comment says this is equivalent to |
||
| jsg::Promise<void> proxyTo(jsg::Lock& js, jsg::Ref<Socket> sock); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not convinced this should be a method on import { connect, proxy } from 'cloudflare:sockets';
// ...
export default {
async connect(socket) {
proxy(socket, connect('...'));
}
}Notice I also dropped the Having
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's OK to add methods without standardizing them first. In fact that's how it's supposed to work -- somebody implements the new feature first, then if desired it gets standardized later. That said I'm not necessarily against making I don't think the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't the spec also cover whatever methods are in the
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Not necessarily. The spec currently only covers the |
||
|
|
||
| // Flushes write buffers then performs a TLS handshake on the current Socket connection. | ||
| // The current `Socket` instance is closed and its readable/writable instances are also closed. | ||
| // All new operations should be performed on the new `Socket` instance. | ||
|
|
@@ -155,6 +160,7 @@ class Socket: public jsg::Object { | |
| JSG_READONLY_PROTOTYPE_PROPERTY(secureTransport, getSecureTransport); | ||
| JSG_METHOD(close); | ||
| JSG_METHOD(startTls); | ||
| JSG_METHOD(proxyTo); | ||
|
|
||
| JSG_TS_OVERRIDE({ | ||
| get secureTransport(): 'on' | 'off' | 'starttls'; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| import { connect } from 'cloudflare:sockets'; | ||
| import { WorkerEntrypoint } from 'cloudflare:workers'; | ||
|
|
||
| export class ConnectProxy extends WorkerEntrypoint { | ||
| async connect(socket) { | ||
| // proxy for ConnectEndpoint instance on port 8083. | ||
| let upstream = connect('localhost:8083'); | ||
| await socket.proxyTo(upstream); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This pattern with the If a worker is just connecting two sockets together and not doing anything else, we need to be able to make the connection at the kj streams level, allow the IoContext to go away while the proxy pipeline is still flowing, just like we would do if we had something like
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, this is arguably an advantage of the version of the That said, I wonder if there's any way we can dig through the promise chain and discover if we're waiting on a promise that is actually just a KJ promise. This could benefit the Or another idea: Maybe The latter idea is obviously easier to implement and seems pretty OK to me for now (though it would still be useful to explore promise-unwrapping in the future).
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm... that might be a bit tricky. Imagine a case like: // In the fetch handler
const responsePromise = fetch('...');
responsePromise.then((response) => {
// This is guaranteed to run before our internal handling of the response
// and may have observable side effects
});
return responsePromise;I think the only way for it to work would be also determining that there are no other reactions on the promise that could trigger observable side effects.
I'm leaning this direction also. So long as there's a way to optionally cancel the proxy (e.g. But then again, there's a certain simple elegance if we could just return the export default {
fetch() {
// Intentionally using fetch here, not connect
const socketA = connect('...');
const socketB = connect('...');
socketA.proxyTo(socketB);
// ...
}
}
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And to be clear, I know that latter pattern is already possible with the current API... I just don't know if we want to encourage it by making it easier and more ergonomic. I don't have a an immediate good reason to say no, just being thorough in thinking through the new API
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should support proxying between any two sockets. Seems like just returning void from proxyTo() is the best option here.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SGTM |
||
| } | ||
| } | ||
|
|
||
| export class ConnectEndpoint extends WorkerEntrypoint { | ||
| async connect(socket) { | ||
| const enc = new TextEncoder(); | ||
| let writer = socket.writable.getWriter(); | ||
| await writer.write(enc.encode('hello-from-endpoint')); | ||
| await writer.close(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| import { connect } from 'cloudflare:sockets'; | ||
| import { ok, strictEqual } from 'assert'; | ||
|
|
||
| export const connectHandler = { | ||
| async test() { | ||
| // Check that the connect handler can send a message through a socket | ||
| const socket = connect('localhost:8081'); | ||
| await socket.opened; | ||
| const dec = new TextDecoder(); | ||
| let result = ''; | ||
| for await (const chunk of socket.readable) { | ||
| result += dec.decode(chunk, { stream: true }); | ||
| } | ||
| result += dec.decode(); | ||
| strictEqual(result, 'hello'); | ||
| await socket.closed; | ||
| }, | ||
| }; | ||
|
|
||
| export const connectHandlerProxy = { | ||
| async test() { | ||
| // Check that we can get a message proxied through a connect handler. This call connects us with | ||
| // an instance of Server, which serves as a proxy for an instance of OtherServer, as defined in | ||
| // connect-handler-test-proxy.js. | ||
| const socket = connect('localhost:8082'); | ||
| await socket.opened; | ||
| const dec = new TextDecoder(); | ||
| let result = ''; | ||
| for await (const chunk of socket.readable) { | ||
| result += dec.decode(chunk, { stream: true }); | ||
| } | ||
| result += dec.decode(); | ||
| strictEqual(result, 'hello-from-endpoint'); | ||
| await socket.closed; | ||
| }, | ||
| }; | ||
|
|
||
| export default { | ||
| async connect(socket) { | ||
| const enc = new TextEncoder(); | ||
| let writer = socket.writable.getWriter(); | ||
| await writer.write(enc.encode('hello')); | ||
| await writer.close(); | ||
| }, | ||
| }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| using Workerd = import "/workerd/workerd.capnp"; | ||
|
|
||
| const unitTests :Workerd.Config = ( | ||
| services = [ | ||
| ( name = "connect-handler-test", | ||
| worker = ( | ||
| modules = [ | ||
| (name = "worker", esModule = embed "connect-handler-test.js"), | ||
| ], | ||
| compatibilityFlags = ["nodejs_compat_v2", "experimental"], | ||
| ) | ||
| ), | ||
| ( name = "connect-handler-test-proxy", | ||
| worker = ( | ||
| modules = [ | ||
| (name = "worker", esModule = embed "connect-handler-test-proxy.js"), | ||
| ], | ||
| compatibilityFlags = ["nodejs_compat_v2", "experimental"], | ||
| ) | ||
| ), | ||
| ( name = "connect-handler-test-endpoint", | ||
| worker = ( | ||
| modules = [ | ||
| (name = "worker", esModule = embed "connect-handler-test-proxy.js"), | ||
| ], | ||
| compatibilityFlags = ["nodejs_compat_v2", "experimental"], | ||
| ) | ||
| ), | ||
| ( name = "internet", network = ( allow = ["private"] ) ) | ||
| ], | ||
| sockets = [ | ||
| (name = "tcp", address = "*:8081", tcp = (), service = "connect-handler-test"), | ||
| (name = "tcp", address = "*:8082", tcp = (), service = (name = "connect-handler-test-proxy", entrypoint = "ConnectProxy")), | ||
| (name = "tcp", address = "*:8083", tcp = (), service = (name = "connect-handler-test-endpoint", entrypoint = "ConnectEndpoint")) | ||
| ] | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to think about this – one issue with taking options is that we can't apply them to both streams since the AbortSignal field can't be copied/shared between the two streams(?), having two options parameters instead seems highly inelegant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't the single AbortSignal be used to abort both directions? I think that's what people would want.
That said, I don't think we necessarily need to support the options. People can always call pipeTo() manually if they want options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should absolutely be just a single
AbortSignal. Users should not have to pass two and there really should not be a reason to.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I agree, there is no reason to pass
PipeToOptions.AbortSignalis really the only option in this case that matters.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we accept an AbortSignal it should still be part of an options struct.
If we're accepting an options struct then I think it might as well be
PipeToOptions, and we might as well pass all the options into bothpipeTo()s.