Skip to content

Commit 9a9e934

Browse files
authored
Decouple transport from runtime (#2430)
As currently implemented, `pkg/transport` is tightly coupled with the underlying runtime. Specifically, transport implementations are aware of the fact that they're wrapping a container workload and even use `rt.Deployer` to stop and restart the workload. This coupling makes it hard to test the transport layer. This is the first of a series of patches that aim to decouple the transport layer from the underlying workload. First of a series of PRs addressing #2429
1 parent e20820e commit 9a9e934

18 files changed

+317
-299
lines changed

cmd/thv/app/proxy.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,11 @@ func proxyCmdFunc(cmd *cobra.Command, args []string) error {
253253

254254
// Create the transparent proxy with middlewares
255255
proxy := transparent.NewTransparentProxy(
256-
proxyHost, port, serverName, proxyTargetURI,
257-
nil, authInfoHandler,
256+
proxyHost,
257+
port,
258+
proxyTargetURI,
259+
nil,
260+
authInfoHandler,
258261
false,
259262
false, // isRemote
260263
"",

pkg/runner/runner.go

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/stacklok/toolhive/pkg/labels"
1919
"github.com/stacklok/toolhive/pkg/logger"
2020
"github.com/stacklok/toolhive/pkg/process"
21+
"github.com/stacklok/toolhive/pkg/runtime"
2122
"github.com/stacklok/toolhive/pkg/secrets"
2223
"github.com/stacklok/toolhive/pkg/telemetry"
2324
"github.com/stacklok/toolhive/pkg/transport"
@@ -137,11 +138,6 @@ func (r *Runner) Run(ctx context.Context) error {
137138
// Set proxy mode for stdio transport
138139
transportConfig.ProxyMode = r.Config.ProxyMode
139140

140-
transportHandler, err := transport.NewFactory().Create(transportConfig)
141-
if err != nil {
142-
return fmt.Errorf("failed to create transport: %v", err)
143-
}
144-
145141
// Process secrets if provided (regular secrets or RemoteAuthConfig.ClientSecret in CLI format)
146142
hasRegularSecrets := len(r.Config.Secrets) > 0
147143
hasRemoteAuthSecret := r.Config.RemoteAuthConfig != nil && r.Config.RemoteAuthConfig.ClientSecret != ""
@@ -175,7 +171,48 @@ func (r *Runner) Run(ctx context.Context) error {
175171
// Set up the transport
176172
logger.Infof("Setting up %s transport...", r.Config.Transport)
177173

178-
// For remote MCP servers, set the remote URL on HTTP transports before setup
174+
// Prepare transport options based on workload type
175+
var transportOpts []transport.Option
176+
var setupResult *runtime.SetupResult
177+
178+
if r.Config.RemoteURL == "" {
179+
// For local workloads, deploy the container using runtime.Setup first
180+
result, err := runtime.Setup(
181+
ctx,
182+
r.Config.Transport,
183+
r.Config.Deployer,
184+
r.Config.ContainerName,
185+
r.Config.Image,
186+
r.Config.CmdArgs,
187+
r.Config.EnvVars,
188+
r.Config.ContainerLabels,
189+
r.Config.PermissionProfile,
190+
r.Config.K8sPodTemplatePatch,
191+
r.Config.IsolateNetwork,
192+
r.Config.IgnoreConfig,
193+
r.Config.Host,
194+
r.Config.TargetPort,
195+
r.Config.TargetHost,
196+
)
197+
if err != nil {
198+
return fmt.Errorf("failed to set up workload: %v", err)
199+
}
200+
setupResult = result
201+
202+
// Configure the transport with the setup results using options
203+
transportOpts = append(transportOpts, transport.WithContainerName(setupResult.ContainerName))
204+
if setupResult.TargetURI != "" {
205+
transportOpts = append(transportOpts, transport.WithTargetURI(setupResult.TargetURI))
206+
}
207+
}
208+
209+
// Create transport with options
210+
transportHandler, err := transport.NewFactory().Create(transportConfig, transportOpts...)
211+
if err != nil {
212+
return fmt.Errorf("failed to create transport: %v", err)
213+
}
214+
215+
// For remote MCP servers, set the remote URL on HTTP transports
179216
if r.Config.RemoteURL != "" {
180217
if httpTransport, ok := transportHandler.(interface{ SetRemoteURL(string) }); ok {
181218
httpTransport.SetRemoteURL(r.Config.RemoteURL)
@@ -191,17 +228,6 @@ func (r *Runner) Run(ctx context.Context) error {
191228
if httpTransport, ok := transportHandler.(interface{ SetTokenSource(oauth2.TokenSource) }); ok {
192229
httpTransport.SetTokenSource(tokenSource)
193230
}
194-
195-
// For remote workloads, we don't need a deployer
196-
r.Config.Deployer = nil
197-
}
198-
199-
if err := transportHandler.Setup(
200-
ctx, r.Config.Deployer, r.Config.ContainerName, r.Config.Image, r.Config.CmdArgs,
201-
r.Config.EnvVars, r.Config.ContainerLabels, r.Config.PermissionProfile, r.Config.K8sPodTemplatePatch,
202-
r.Config.IsolateNetwork, r.Config.IgnoreConfig,
203-
); err != nil {
204-
return fmt.Errorf("failed to set up transport: %v", err)
205231
}
206232

207233
// Start the transport (which also starts the container and monitoring)

pkg/runtime/setup.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Package runtime provides workload deployment setup functionality
2+
// that was previously part of the transport package.
3+
package runtime
4+
5+
import (
6+
"context"
7+
"fmt"
8+
9+
rt "github.com/stacklok/toolhive/pkg/container/runtime"
10+
"github.com/stacklok/toolhive/pkg/ignore"
11+
"github.com/stacklok/toolhive/pkg/logger"
12+
"github.com/stacklok/toolhive/pkg/permissions"
13+
"github.com/stacklok/toolhive/pkg/transport/types"
14+
)
15+
16+
var transportEnvMap = map[types.TransportType]string{
17+
types.TransportTypeSSE: "sse",
18+
types.TransportTypeStreamableHTTP: "streamable-http",
19+
types.TransportTypeStdio: "stdio",
20+
}
21+
22+
// SetupResult contains the results of setting up a workload
23+
type SetupResult struct {
24+
ContainerName string
25+
TargetURI string
26+
TargetPort int
27+
TargetHost string
28+
}
29+
30+
// Setup prepares and deploys a workload for use with a transport.
31+
// The runtime parameter provides access to container operations.
32+
// The permissionProfile is used to configure container permissions (including network mode).
33+
// The k8sPodTemplatePatch is a JSON string to patch the Kubernetes pod template.
34+
// Returns the container name and target URI for configuring the transport.
35+
func Setup(
36+
ctx context.Context,
37+
transportType types.TransportType,
38+
runtime rt.Deployer,
39+
containerName string,
40+
image string,
41+
cmdArgs []string,
42+
envVars, labels map[string]string,
43+
permissionProfile *permissions.Profile,
44+
k8sPodTemplatePatch string,
45+
isolateNetwork bool,
46+
ignoreConfig *ignore.Config,
47+
host string,
48+
targetPort int,
49+
targetHost string,
50+
) (*SetupResult, error) {
51+
// Add transport-specific environment variables
52+
env, ok := transportEnvMap[transportType]
53+
if !ok && transportType != types.TransportTypeStdio {
54+
return nil, fmt.Errorf("unsupported transport type: %s", transportType)
55+
}
56+
57+
// For stdio transport, env is already set above
58+
if transportType == types.TransportTypeStdio {
59+
envVars["MCP_TRANSPORT"] = "stdio"
60+
} else {
61+
envVars["MCP_TRANSPORT"] = env
62+
63+
// Use the target port for the container's environment variables
64+
envVars["MCP_PORT"] = fmt.Sprintf("%d", targetPort)
65+
envVars["FASTMCP_PORT"] = fmt.Sprintf("%d", targetPort)
66+
envVars["MCP_HOST"] = targetHost
67+
}
68+
69+
// Create workload options
70+
containerOptions := rt.NewDeployWorkloadOptions()
71+
containerOptions.K8sPodTemplatePatch = k8sPodTemplatePatch
72+
containerOptions.IgnoreConfig = ignoreConfig
73+
74+
if transportType == types.TransportTypeStdio {
75+
containerOptions.AttachStdio = true
76+
} else {
77+
// Expose the target port in the container
78+
containerPortStr := fmt.Sprintf("%d/tcp", targetPort)
79+
containerOptions.ExposedPorts[containerPortStr] = struct{}{}
80+
81+
// Create host port bindings (configurable through the --host flag)
82+
portBindings := []rt.PortBinding{
83+
{
84+
HostIP: host,
85+
HostPort: fmt.Sprintf("%d", targetPort),
86+
},
87+
}
88+
89+
// Set the port bindings
90+
containerOptions.PortBindings[containerPortStr] = portBindings
91+
}
92+
93+
// Create the container
94+
logger.Infof("Deploying workload %s from image %s...", containerName, image)
95+
exposedPort, err := runtime.DeployWorkload(
96+
ctx,
97+
image,
98+
containerName,
99+
cmdArgs,
100+
envVars,
101+
labels,
102+
permissionProfile,
103+
transportType.String(),
104+
containerOptions,
105+
isolateNetwork,
106+
)
107+
if err != nil {
108+
return nil, fmt.Errorf("failed to create container: %v", err)
109+
}
110+
logger.Infof("Container created: %s", containerName)
111+
112+
result := &SetupResult{
113+
ContainerName: containerName,
114+
TargetHost: targetHost,
115+
TargetPort: targetPort,
116+
}
117+
118+
// For stdio transport, there's no target URI
119+
if transportType == types.TransportTypeStdio {
120+
result.TargetURI = ""
121+
} else {
122+
// Update target host and port if needed (for Kubernetes)
123+
if (transportType == types.TransportTypeSSE || transportType == types.TransportTypeStreamableHTTP) && rt.IsKubernetesRuntime() {
124+
// If the SSEHeadlessServiceName is set, use it as the target host
125+
if containerOptions.SSEHeadlessServiceName != "" {
126+
result.TargetHost = containerOptions.SSEHeadlessServiceName
127+
}
128+
}
129+
130+
// we don't want to override the targetPort in a Kubernetes deployment. Because
131+
// by default the Kubernetes container deployer returns `0` for the exposedPort
132+
// therefore causing the "target port not set" error when it is assigned to the targetPort.
133+
// Issues:
134+
// - https://github.com/stacklok/toolhive/issues/902
135+
// - https://github.com/stacklok/toolhive/issues/924
136+
if !rt.IsKubernetesRuntime() {
137+
result.TargetPort = exposedPort
138+
}
139+
140+
// Construct target URI
141+
result.TargetURI = fmt.Sprintf("http://%s:%d", result.TargetHost, result.TargetPort)
142+
}
143+
144+
return result, nil
145+
}

pkg/transport/factory.go

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,42 @@ func NewFactory() *Factory {
1515
return &Factory{}
1616
}
1717

18+
// Option is a function that configures a transport
19+
type Option func(types.Transport) error
20+
21+
// WithContainerName returns an option that sets the container name on a transport
22+
func WithContainerName(containerName string) Option {
23+
return func(t types.Transport) error {
24+
if setter, ok := t.(interface{ setContainerName(string) }); ok {
25+
setter.setContainerName(containerName)
26+
}
27+
return nil
28+
}
29+
}
30+
31+
// WithTargetURI returns an option that sets the target URI on a transport
32+
func WithTargetURI(targetURI string) Option {
33+
return func(t types.Transport) error {
34+
if setter, ok := t.(interface{ setTargetURI(string) }); ok {
35+
setter.setTargetURI(targetURI)
36+
}
37+
return nil
38+
}
39+
}
40+
1841
// Create creates a transport based on the provided configuration
19-
func (*Factory) Create(config types.Config) (types.Transport, error) {
42+
func (*Factory) Create(config types.Config, opts ...Option) (types.Transport, error) {
43+
var tr types.Transport
44+
2045
switch config.Type {
2146
case types.TransportTypeStdio:
22-
tr := NewStdioTransport(
47+
tr = NewStdioTransport(
2348
config.Host, config.ProxyPort, config.Deployer, config.Debug, config.TrustProxyHeaders,
2449
config.PrometheusHandler, config.Middlewares...,
2550
)
26-
tr.SetProxyMode(config.ProxyMode)
27-
return tr, nil
51+
tr.(*StdioTransport).SetProxyMode(config.ProxyMode)
2852
case types.TransportTypeSSE:
29-
return NewHTTPTransport(
53+
tr = NewHTTPTransport(
3054
types.TransportTypeSSE,
3155
config.Host,
3256
config.ProxyPort,
@@ -37,9 +61,9 @@ func (*Factory) Create(config types.Config) (types.Transport, error) {
3761
config.AuthInfoHandler,
3862
config.PrometheusHandler,
3963
config.Middlewares...,
40-
), nil
64+
)
4165
case types.TransportTypeStreamableHTTP:
42-
return NewHTTPTransport(
66+
tr = NewHTTPTransport(
4367
types.TransportTypeStreamableHTTP,
4468
config.Host,
4569
config.ProxyPort,
@@ -50,11 +74,20 @@ func (*Factory) Create(config types.Config) (types.Transport, error) {
5074
config.AuthInfoHandler,
5175
config.PrometheusHandler,
5276
config.Middlewares...,
53-
), nil
77+
)
5478
case types.TransportTypeInspector:
5579
// HTTP transport is not implemented yet
5680
return nil, errors.ErrUnsupportedTransport
5781
default:
5882
return nil, errors.ErrUnsupportedTransport
5983
}
84+
85+
// Apply options to the transport
86+
for _, opt := range opts {
87+
if err := opt(tr); err != nil {
88+
return nil, err
89+
}
90+
}
91+
92+
return tr, nil
6093
}

0 commit comments

Comments
 (0)