From e95a810a83f554370a3433bbcc494a77d6131a0d Mon Sep 17 00:00:00 2001 From: Dave Page Date: Wed, 27 May 2026 17:37:18 +0100 Subject: [PATCH 01/10] docs: changelog entry for embedding library migration (PR 2) Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/changelog.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/changelog.md b/docs/changelog.md index d08ab9c..42a9691 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -26,7 +26,6 @@ and this project adheres to - OpenAI models that require the Responses API (`gpt-5-*`, `o1-*`, `o3-*`) are now supported transparently; the library routes them to `/v1/responses` automatically based on the model name. - - Embedding provider clients (Voyage, OpenAI, Ollama) now use the shared [`pgedge-go-llm-lib`](https://github.com/pgEdge/pgedge-go-llm-lib) From 5b592eed583aa237c1996d3767a6e188a89011b6 Mon Sep 17 00:00:00 2001 From: Dave Page Date: Wed, 27 May 2026 19:03:51 +0100 Subject: [PATCH 02/10] config+server: add Gemini provider support Adds GeminiAPIKey / GeminiAPIKeyFile fields to LLMConfig with the same priority resolution as the other providers (env > file > config). The server now registers a "gemini" entry in the proxy's provider map when the key is set. The pgedge-go-llm-lib registers the gemini provider via the existing _ "llm/all" / _ "llm/provider/gemini" imports. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/pgedge-pg-mcp-svr/main.go | 8 ++++++++ internal/config/config.go | 13 +++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/cmd/pgedge-pg-mcp-svr/main.go b/cmd/pgedge-pg-mcp-svr/main.go index ad14f66..e9c2f9a 100644 --- a/cmd/pgedge-pg-mcp-svr/main.go +++ b/cmd/pgedge-pg-mcp-svr/main.go @@ -902,6 +902,14 @@ func main() { Temperature: llm.Float(cfg.LLM.Temperature), } } + if cfg.LLM.GeminiAPIKey != "" { + providers["gemini"] = llm.Options{ + APIKey: cfg.LLM.GeminiAPIKey, + Model: cfg.LLM.Model, + MaxTokens: llm.Int(cfg.LLM.MaxTokens), + Temperature: llm.Float(cfg.LLM.Temperature), + } + } if len(providers) == 0 { return fmt.Errorf("LLM is enabled but no provider is configured; " + diff --git a/internal/config/config.go b/internal/config/config.go index 13225b1..b741e2e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -446,7 +446,7 @@ type EmbeddingConfig struct { // LLMConfig holds LLM configuration for web client chat proxy type LLMConfig struct { Enabled bool `yaml:"enabled"` // Whether LLM proxy is enabled (default: false) - Provider string `yaml:"provider"` // "anthropic", "openai", or "ollama" + Provider string `yaml:"provider"` // "anthropic", "openai", "ollama", or "gemini" Model string `yaml:"model"` // Provider-specific model name AnthropicAPIKey string `yaml:"anthropic_api_key"` // API key for Anthropic (direct - discouraged, use api_key_file or env var instead) AnthropicAPIKeyFile string `yaml:"anthropic_api_key_file"` // Path to file containing Anthropic API key @@ -455,6 +455,8 @@ type LLMConfig struct { OpenAIAPIKeyFile string `yaml:"openai_api_key_file"` // Path to file containing OpenAI API key OpenAIBaseURL string `yaml:"openai_base_url"` // Base URL for OpenAI API (default: https://api.openai.com) OllamaURL string `yaml:"ollama_url"` // URL for Ollama service (default: http://localhost:11434) + GeminiAPIKey string `yaml:"gemini_api_key"` // API key for Google Gemini (direct - discouraged, use api_key_file or env var instead) + GeminiAPIKeyFile string `yaml:"gemini_api_key_file"` // Path to file containing Gemini API key MaxTokens int `yaml:"max_tokens"` // Maximum tokens for LLM response (default: 4096) Temperature float64 `yaml:"temperature"` // Temperature for LLM sampling (default: 0.7) } @@ -1011,6 +1013,7 @@ func applyEnvironmentVariables(cfg *Config) { // 1. Try environment variables first (PGEDGE_ prefixed, then standard) setStringFromEnvWithFallback(&cfg.LLM.AnthropicAPIKey, "PGEDGE_ANTHROPIC_API_KEY", "ANTHROPIC_API_KEY") setStringFromEnvWithFallback(&cfg.LLM.OpenAIAPIKey, "PGEDGE_OPENAI_API_KEY", "OPENAI_API_KEY") + setStringFromEnvWithFallback(&cfg.LLM.GeminiAPIKey, "PGEDGE_GEMINI_API_KEY", "GEMINI_API_KEY") // 2. If env vars not set and api_key_file is specified, load from file if cfg.LLM.AnthropicAPIKey == "" && cfg.LLM.AnthropicAPIKeyFile != "" { if key, err := readAPIKeyFromFile(cfg.LLM.AnthropicAPIKeyFile); err == nil && key != "" { @@ -1024,7 +1027,13 @@ func applyEnvironmentVariables(cfg *Config) { } // Note: errors are silently ignored - file may not exist and that's ok } - // 3. Direct config value (if set) is already in cfg.LLM.AnthropicAPIKey/OpenAIAPIKey from mergeConfig + if cfg.LLM.GeminiAPIKey == "" && cfg.LLM.GeminiAPIKeyFile != "" { + if key, err := readAPIKeyFromFile(cfg.LLM.GeminiAPIKeyFile); err == nil && key != "" { + cfg.LLM.GeminiAPIKey = key + } + // Note: errors are silently ignored - file may not exist and that's ok + } + // 3. Direct config value (if set) is already in cfg.LLM.AnthropicAPIKey/OpenAIAPIKey/GeminiAPIKey from mergeConfig setStringFromEnv(&cfg.LLM.OllamaURL, "PGEDGE_OLLAMA_URL") // Base URL overrides for LLM providers (useful for proxies) setStringFromEnv(&cfg.LLM.AnthropicBaseURL, "PGEDGE_ANTHROPIC_BASE_URL") From 89490749fa2b7cc2e0a16d808fdf7270315bf5b6 Mon Sep 17 00:00:00 2001 From: Dave Page Date: Wed, 27 May 2026 19:07:33 +0100 Subject: [PATCH 03/10] tools: switch from embedding.NewProvider to llm.NewClient directly The three embedding-consuming tools (search_knowledgebase, generate_embedding, similarity_search) no longer go through the internal/embedding wrapper layer. A shared internal/tools/embed_client.go helper builds an llm.Client from the per-provider config fields, applying the same defaults the wrapper used to apply. newEmbedClient returns (llm.Client, string, error) where the string is the resolved model name (after defaults such as voyage-3-lite and nomic-embed-text are applied). generate_embedding.go uses this resolved model for metadata display instead of provider.ModelName(); it uses len(vec) for dimensions (populated after the Embed call) and cfg.Embedding.Provider for the provider name. The embedding package now has no consumers outside itself; it is ready to be deleted in the next commit. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/tools/embed_client.go | 90 ++++++++++++++++++++++++++ internal/tools/generate_embedding.go | 17 ++--- internal/tools/search_knowledgebase.go | 11 ++-- internal/tools/similarity_search.go | 9 +-- 4 files changed, 104 insertions(+), 23 deletions(-) create mode 100644 internal/tools/embed_client.go diff --git a/internal/tools/embed_client.go b/internal/tools/embed_client.go new file mode 100644 index 0000000..de9539f --- /dev/null +++ b/internal/tools/embed_client.go @@ -0,0 +1,90 @@ +/*------------------------------------------------------------------------- + * + * pgEdge Natural Language Agent + * + * Copyright (c) 2025 - 2026, pgEdge, Inc. + * This software is released under The PostgreSQL License + * + *------------------------------------------------------------------------- + */ + +package tools + +import ( + "fmt" + + "github.com/pgEdge/pgedge-go-llm-lib/llm" + _ "github.com/pgEdge/pgedge-go-llm-lib/llm/provider/ollama" + _ "github.com/pgEdge/pgedge-go-llm-lib/llm/provider/openai" + _ "github.com/pgEdge/pgedge-go-llm-lib/llm/provider/voyage" +) + +// embedClientConfig collects the per-provider config fields the tools +// need to construct an llm.Client for embeddings. +type embedClientConfig struct { + Provider string + Model string + VoyageAPIKey string + VoyageBaseURL string + OpenAIAPIKey string + OpenAIBaseURL string + OllamaURL string +} + +// newEmbedClient builds an llm.Client for the configured embedding +// provider, applying the same defaults the previous embedding.Provider +// wrapper applied (voyage-3-lite model default, nomic-embed-text for +// ollama, localhost:11434 for the ollama URL). It returns the client +// and the resolved model name (after defaults are applied) so callers +// can display it without re-deriving the default logic. +func newEmbedClient(cfg embedClientConfig) (llm.Client, string, error) { + var opts llm.Options + switch cfg.Provider { + case "voyage": + if cfg.VoyageAPIKey == "" { + return nil, "", fmt.Errorf("Voyage AI API key is required when provider is 'voyage'") + } + model := cfg.Model + if model == "" { + model = "voyage-3-lite" + } + opts = llm.Options{ + APIKey: cfg.VoyageAPIKey, + Model: model, + BaseURL: cfg.VoyageBaseURL, + } + case "openai": + if cfg.OpenAIAPIKey == "" { + return nil, "", fmt.Errorf("OpenAI API key is required when provider is 'openai'") + } + opts = llm.Options{ + APIKey: cfg.OpenAIAPIKey, + Model: cfg.Model, + BaseURL: cfg.OpenAIBaseURL, + } + case "ollama": + baseURL := cfg.OllamaURL + if baseURL == "" { + baseURL = "http://localhost:11434" + } + model := cfg.Model + if model == "" { + model = "nomic-embed-text" + } + opts = llm.Options{ + Model: model, + BaseURL: baseURL, + } + default: + return nil, "", fmt.Errorf( + "unsupported embedding provider: %s (supported: voyage, openai, ollama)", + cfg.Provider, + ) + } + + client, err := llm.NewClient(cfg.Provider, opts) + if err != nil { + return nil, "", fmt.Errorf("create %s embedding client: %w", cfg.Provider, err) + } + return client, opts.Model, nil +} diff --git a/internal/tools/generate_embedding.go b/internal/tools/generate_embedding.go index 81635bd..361ae18 100644 --- a/internal/tools/generate_embedding.go +++ b/internal/tools/generate_embedding.go @@ -17,7 +17,6 @@ import ( "strings" "pgedge-postgres-mcp/internal/config" - "pgedge-postgres-mcp/internal/embedding" "pgedge-postgres-mcp/internal/mcp" ) @@ -55,8 +54,8 @@ func GenerateEmbeddingTool(cfg *config.Config) Tool { return mcp.NewToolError("'text' parameter cannot be empty or whitespace-only") } - // Create embedding provider from config - embCfg := embedding.Config{ + // Create embedding client from config + client, resolvedModel, err := newEmbedClient(embedClientConfig{ Provider: cfg.Embedding.Provider, Model: cfg.Embedding.Model, VoyageAPIKey: cfg.Embedding.VoyageAPIKey, @@ -64,16 +63,14 @@ func GenerateEmbeddingTool(cfg *config.Config) Tool { OpenAIAPIKey: cfg.Embedding.OpenAIAPIKey, OpenAIBaseURL: cfg.Embedding.OpenAIBaseURL, OllamaURL: cfg.Embedding.OllamaURL, - } - - provider, err := embedding.NewProvider(embCfg) + }) if err != nil { return mcp.NewToolError(fmt.Sprintf("Failed to initialize embedding provider: %v", err)) } // Generate embedding ctx := context.Background() - vector, err := provider.Embed(ctx, text) + vector, err := client.Embed(ctx, text) if err != nil { return mcp.NewToolError(fmt.Sprintf("Failed to generate embedding: %v", err)) } @@ -92,9 +89,9 @@ func GenerateEmbeddingTool(cfg *config.Config) Tool { sb.WriteString("Embedding Generated Successfully\n") sb.WriteString(strings.Repeat("=", 50)) sb.WriteString("\n\n") - fmt.Fprintf(&sb, "Provider: %s\n", provider.ProviderName()) - fmt.Fprintf(&sb, "Model: %s\n", provider.ModelName()) - fmt.Fprintf(&sb, "Dimensions: %d\n", provider.Dimensions()) + fmt.Fprintf(&sb, "Provider: %s\n", cfg.Embedding.Provider) + fmt.Fprintf(&sb, "Model: %s\n", resolvedModel) + fmt.Fprintf(&sb, "Dimensions: %d\n", len(vector)) fmt.Fprintf(&sb, "Text Length: %d characters\n", len(text)) fmt.Fprintf(&sb, "\nText:\n%s\n\n", text) fmt.Fprintf(&sb, "Embedding Vector (%d dimensions):\n%s", len(vector), string(vectorJSON)) diff --git a/internal/tools/search_knowledgebase.go b/internal/tools/search_knowledgebase.go index 838ed75..f1c556f 100644 --- a/internal/tools/search_knowledgebase.go +++ b/internal/tools/search_knowledgebase.go @@ -21,7 +21,6 @@ import ( _ "modernc.org/sqlite" "pgedge-postgres-mcp/internal/config" - "pgedge-postgres-mcp/internal/embedding" "pgedge-postgres-mcp/internal/mcp" ) @@ -256,7 +255,7 @@ func generateKBQueryEmbedding(serverCfg *config.Config, queryText string) ([]flo return nil, "", fmt.Errorf("knowledgebase embedding provider not configured") } - embCfg := embedding.Config{ + client, _, err := newEmbedClient(embedClientConfig{ Provider: kbCfg.EmbeddingProvider, Model: kbCfg.EmbeddingModel, VoyageAPIKey: kbCfg.EmbeddingVoyageAPIKey, @@ -264,15 +263,13 @@ func generateKBQueryEmbedding(serverCfg *config.Config, queryText string) ([]flo OpenAIAPIKey: kbCfg.EmbeddingOpenAIAPIKey, OpenAIBaseURL: kbCfg.EmbeddingOpenAIBaseURL, OllamaURL: kbCfg.EmbeddingOllamaURL, - } - - provider, err := embedding.NewProvider(embCfg) + }) if err != nil { return nil, "", err } ctx := context.Background() - vector, err := provider.Embed(ctx, queryText) + vector, err := client.Embed(ctx, queryText) if err != nil { return nil, "", err } @@ -287,7 +284,7 @@ func generateKBQueryEmbedding(serverCfg *config.Config, queryText string) ([]flo vector32[i] = float32(v) } - return vector32, embCfg.Provider, nil + return vector32, kbCfg.EmbeddingProvider, nil } func searchKB(kbPath string, queryEmbedding []float32, projectNames, projectVersions []string, topN int, provider string) ([]KBSearchResult, error) { diff --git a/internal/tools/similarity_search.go b/internal/tools/similarity_search.go index dc9fa83..d53f875 100644 --- a/internal/tools/similarity_search.go +++ b/internal/tools/similarity_search.go @@ -17,7 +17,6 @@ import ( "pgedge-postgres-mcp/internal/config" "pgedge-postgres-mcp/internal/database" - "pgedge-postgres-mcp/internal/embedding" "pgedge-postgres-mcp/internal/logging" "pgedge-postgres-mcp/internal/mcp" "pgedge-postgres-mcp/internal/search" @@ -617,7 +616,7 @@ func generateQueryEmbeddingWithConfig(serverCfg *config.Config, queryText string return nil, fmt.Errorf("embedding generation is not enabled in server configuration") } - embCfg := embedding.Config{ + client, _, err := newEmbedClient(embedClientConfig{ Provider: serverCfg.Embedding.Provider, Model: serverCfg.Embedding.Model, VoyageAPIKey: serverCfg.Embedding.VoyageAPIKey, @@ -625,15 +624,13 @@ func generateQueryEmbeddingWithConfig(serverCfg *config.Config, queryText string OpenAIAPIKey: serverCfg.Embedding.OpenAIAPIKey, OpenAIBaseURL: serverCfg.Embedding.OpenAIBaseURL, OllamaURL: serverCfg.Embedding.OllamaURL, - } - - provider, err := embedding.NewProvider(embCfg) + }) if err != nil { return nil, err } ctx := context.Background() - vector, err := provider.Embed(ctx, queryText) + vector, err := client.Embed(ctx, queryText) if err != nil { return nil, err } From a25350ed81f224a770c1736e50494b4fdbad9e5a Mon Sep 17 00:00:00 2001 From: Dave Page Date: Wed, 27 May 2026 19:08:20 +0100 Subject: [PATCH 04/10] embedding: delete package; tools now use llm.NewClient directly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After PR 4's tool refactor (previous commit) nothing in the codebase imports internal/embedding. The package is removed entirely — its responsibilities are split between pgedge-go-llm-lib (provider wire code) and internal/tools/embed_client.go (config translation + defaults). Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/embedding/libprovider.go | 132 ------------------- internal/embedding/libprovider_test.go | 172 ------------------------- internal/embedding/provider.go | 47 ------- internal/embedding/provider_test.go | 160 ----------------------- 4 files changed, 511 deletions(-) delete mode 100644 internal/embedding/libprovider.go delete mode 100644 internal/embedding/libprovider_test.go delete mode 100644 internal/embedding/provider.go delete mode 100644 internal/embedding/provider_test.go diff --git a/internal/embedding/libprovider.go b/internal/embedding/libprovider.go deleted file mode 100644 index a510132..0000000 --- a/internal/embedding/libprovider.go +++ /dev/null @@ -1,132 +0,0 @@ -/*------------------------------------------------------------------------- - * - * pgEdge Natural Language Agent - * - * Copyright (c) 2025 - 2026, pgEdge, Inc. - * This software is released under The PostgreSQL License - * - *------------------------------------------------------------------------- - */ - -package embedding - -import ( - "context" - "fmt" - "sync/atomic" - - "github.com/pgEdge/pgedge-go-llm-lib/llm" - _ "github.com/pgEdge/pgedge-go-llm-lib/llm/provider/ollama" - _ "github.com/pgEdge/pgedge-go-llm-lib/llm/provider/openai" - _ "github.com/pgEdge/pgedge-go-llm-lib/llm/provider/voyage" -) - -// libProvider is the implementation of Provider backed by -// pgedge-go-llm-lib. Constructed via NewProvider; satisfies the -// existing Provider interface so tool consumers compile unchanged. -// -// Dimensions are not known up front because the library does not -// expose them ahead of an Embed call. The first successful Embed -// populates the cached dimensions atomically; Dimensions() returns -// 0 until that happens. -type libProvider struct { - inner llm.Client - provider string - model string - dim atomic.Int32 -} - -// NewProvider constructs a Provider backed by pgedge-go-llm-lib. -// Supported provider names: "voyage", "openai", "ollama". -func NewProvider(cfg Config) (Provider, error) { - opts, err := optionsForConfig(cfg) - if err != nil { - return nil, err - } - - inner, err := llm.NewClient(cfg.Provider, opts) - if err != nil { - return nil, fmt.Errorf("create %s client: %w", cfg.Provider, err) - } - - return &libProvider{ - inner: inner, - provider: cfg.Provider, - model: opts.Model, - }, nil -} - -// optionsForConfig translates our Config into llm.Options for the -// named provider, applying provider-specific defaults. -func optionsForConfig(cfg Config) (llm.Options, error) { - switch cfg.Provider { - case "voyage": - if cfg.VoyageAPIKey == "" { - return llm.Options{}, fmt.Errorf("Voyage AI API key is required when provider is 'voyage'") - } - model := cfg.Model - if model == "" { - model = "voyage-3-lite" - } - return llm.Options{ - APIKey: cfg.VoyageAPIKey, - Model: model, - BaseURL: cfg.VoyageBaseURL, - }, nil - - case "openai": - if cfg.OpenAIAPIKey == "" { - return llm.Options{}, fmt.Errorf("OpenAI API key is required when provider is 'openai'") - } - return llm.Options{ - APIKey: cfg.OpenAIAPIKey, - Model: cfg.Model, - BaseURL: cfg.OpenAIBaseURL, - }, nil - - case "ollama": - baseURL := cfg.OllamaURL - if baseURL == "" { - baseURL = "http://localhost:11434" - } - model := cfg.Model - if model == "" { - model = "nomic-embed-text" - } - return llm.Options{ - Model: model, - BaseURL: baseURL, - }, nil - - default: - return llm.Options{}, fmt.Errorf("unsupported embedding provider: %s (supported: voyage, openai, ollama)", cfg.Provider) - } -} - -// Embed generates an embedding vector and lazily caches the dimension. -func (p *libProvider) Embed(ctx context.Context, text string) ([]float64, error) { - vec, err := p.inner.Embed(ctx, text) - if err != nil { - return nil, err - } - if d := int32(len(vec)); d > 0 { - p.dim.CompareAndSwap(0, d) - } - return vec, nil -} - -// Dimensions returns the cached embedding dimension, or 0 if Embed -// has not yet been called successfully. -func (p *libProvider) Dimensions() int { - return int(p.dim.Load()) -} - -// ModelName returns the configured model name. -func (p *libProvider) ModelName() string { - return p.model -} - -// ProviderName returns the provider name ("voyage" / "openai" / "ollama"). -func (p *libProvider) ProviderName() string { - return p.provider -} diff --git a/internal/embedding/libprovider_test.go b/internal/embedding/libprovider_test.go deleted file mode 100644 index b4b7c52..0000000 --- a/internal/embedding/libprovider_test.go +++ /dev/null @@ -1,172 +0,0 @@ -/*------------------------------------------------------------------------- - * - * pgEdge Natural Language Agent - * - * Copyright (c) 2025 - 2026, pgEdge, Inc. - * This software is released under The PostgreSQL License - * - *------------------------------------------------------------------------- - */ - -package embedding - -import ( - "context" - "net/http" - "net/http/httptest" - "strings" - "testing" -) - -func TestLibProvider_Voyage_Embed_RoundTrip(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write([]byte(`{ - "data":[{"embedding":[0.1,0.2,0.3,0.4],"index":0}], - "model":"voyage-3-lite", - "usage":{"total_tokens":5} - }`)) - })) - defer server.Close() - - p, err := NewProvider(Config{ - Provider: "voyage", - Model: "voyage-3-lite", - VoyageAPIKey: "test-key", - VoyageBaseURL: server.URL, - }) - if err != nil { - t.Fatalf("NewProvider: %v", err) - } - - vec, err := p.Embed(context.Background(), "hello world") - if err != nil { - t.Fatalf("Embed: %v", err) - } - if len(vec) != 4 { - t.Errorf("expected 4-dim vector, got %d", len(vec)) - } - if p.Dimensions() != 4 { - t.Errorf("Dimensions() = %d, want 4", p.Dimensions()) - } - if p.ProviderName() != "voyage" { - t.Errorf("ProviderName() = %q", p.ProviderName()) - } - if p.ModelName() != "voyage-3-lite" { - t.Errorf("ModelName() = %q", p.ModelName()) - } -} - -func TestLibProvider_OpenAI_Embed_RoundTrip(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write([]byte(`{ - "object":"list", - "data":[{"object":"embedding","embedding":[0.5,0.6,0.7],"index":0}], - "model":"text-embedding-3-small", - "usage":{"prompt_tokens":3,"total_tokens":3} - }`)) - })) - defer server.Close() - - p, err := NewProvider(Config{ - Provider: "openai", - Model: "text-embedding-3-small", - OpenAIAPIKey: "test-key", - OpenAIBaseURL: server.URL, - }) - if err != nil { - t.Fatalf("NewProvider: %v", err) - } - - vec, err := p.Embed(context.Background(), "hello") - if err != nil { - t.Fatalf("Embed: %v", err) - } - if len(vec) != 3 { - t.Errorf("vec len = %d", len(vec)) - } - if p.Dimensions() != 3 { - t.Errorf("Dimensions() = %d", p.Dimensions()) - } -} - -func TestLibProvider_Ollama_Embed_RoundTrip(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if !strings.HasPrefix(r.URL.Path, "/api/embed") { - t.Errorf("unexpected path %s", r.URL.Path) - } - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write([]byte(`{"embeddings":[[0.9,0.8]]}`)) - })) - defer server.Close() - - p, err := NewProvider(Config{ - Provider: "ollama", - Model: "nomic-embed-text", - OllamaURL: server.URL, - }) - if err != nil { - t.Fatalf("NewProvider: %v", err) - } - - vec, err := p.Embed(context.Background(), "hi") - if err != nil { - t.Fatalf("Embed: %v", err) - } - if len(vec) != 2 { - t.Errorf("vec len = %d", len(vec)) - } - if p.Dimensions() != 2 { - t.Errorf("Dimensions() = %d, want 2", p.Dimensions()) - } -} - -func TestLibProvider_Dimensions_LazyOnFirstEmbed(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write([]byte(`{"data":[{"embedding":[1,1,1,1,1,1,1,1],"index":0}],"model":"x","usage":{"total_tokens":1}}`)) - })) - defer server.Close() - - p, err := NewProvider(Config{ - Provider: "voyage", - Model: "voyage-3-lite", - VoyageAPIKey: "test-key", - VoyageBaseURL: server.URL, - }) - if err != nil { - t.Fatalf("NewProvider: %v", err) - } - if p.Dimensions() != 0 { - t.Errorf("Dimensions() before Embed = %d, want 0", p.Dimensions()) - } - if _, err := p.Embed(context.Background(), "x"); err != nil { - t.Fatalf("Embed: %v", err) - } - if p.Dimensions() != 8 { - t.Errorf("Dimensions() after Embed = %d, want 8", p.Dimensions()) - } -} - -func TestLibProvider_Embed_PropagatesError(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusUnauthorized) - _, _ = w.Write([]byte(`{"error":"bad key"}`)) - })) - defer server.Close() - - p, err := NewProvider(Config{ - Provider: "voyage", - Model: "voyage-3-lite", - VoyageAPIKey: "wrong", - VoyageBaseURL: server.URL, - }) - if err != nil { - t.Fatalf("NewProvider: %v", err) - } - _, err = p.Embed(context.Background(), "x") - if err == nil { - t.Fatal("expected upstream error, got nil") - } -} diff --git a/internal/embedding/provider.go b/internal/embedding/provider.go deleted file mode 100644 index 7f8f938..0000000 --- a/internal/embedding/provider.go +++ /dev/null @@ -1,47 +0,0 @@ -/*------------------------------------------------------------------------- - * - * pgEdge Natural Language Agent - * - * Copyright (c) 2025 - 2026, pgEdge, Inc. - * This software is released under The PostgreSQL License - * - *------------------------------------------------------------------------- - */ - -package embedding - -import ( - "context" -) - -// Provider defines the interface for embedding generation -type Provider interface { - // Embed generates an embedding vector for the given text - Embed(ctx context.Context, text string) ([]float64, error) - - // Dimensions returns the number of dimensions in the embedding vector - Dimensions() int - - // ModelName returns the name of the model being used - ModelName() string - - // ProviderName returns the name of the provider (e.g., "voyage", "ollama", "openai") - ProviderName() string -} - -// Config holds configuration for embedding providers -type Config struct { - Provider string // "voyage", "ollama", or "openai" - Model string // Model name (provider-specific) - - // Voyage AI-specific - VoyageAPIKey string - VoyageBaseURL string // Base URL for Voyage API (optional, uses default if empty) - - // OpenAI-specific - OpenAIAPIKey string - OpenAIBaseURL string // Base URL for OpenAI API (optional, uses default if empty) - - // Ollama-specific - OllamaURL string -} diff --git a/internal/embedding/provider_test.go b/internal/embedding/provider_test.go deleted file mode 100644 index 00d55fe..0000000 --- a/internal/embedding/provider_test.go +++ /dev/null @@ -1,160 +0,0 @@ -/*------------------------------------------------------------------------- - * - * pgEdge Natural Language Agent - * - * Copyright (c) 2025 - 2026, pgEdge, Inc. - * This software is released under The PostgreSQL License - * - *------------------------------------------------------------------------- - */ - -package embedding - -import ( - "testing" -) - -func TestNewProvider_Voyage(t *testing.T) { - t.Run("valid config", func(t *testing.T) { - cfg := Config{ - Provider: "voyage", - Model: "voyage-3-lite", - VoyageAPIKey: "test-api-key-12345678", - } - - provider, err := NewProvider(cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if provider == nil { - t.Fatal("expected non-nil provider") - } - if provider.ProviderName() != "voyage" { - t.Errorf("expected provider name 'voyage', got %q", provider.ProviderName()) - } - }) - - t.Run("missing API key", func(t *testing.T) { - cfg := Config{ - Provider: "voyage", - Model: "voyage-3-lite", - } - - _, err := NewProvider(cfg) - if err == nil { - t.Fatal("expected error for missing API key") - } - }) -} - -func TestNewProvider_OpenAI(t *testing.T) { - t.Run("valid config", func(t *testing.T) { - cfg := Config{ - Provider: "openai", - Model: "text-embedding-3-small", - OpenAIAPIKey: "test-api-key-12345678", - } - - provider, err := NewProvider(cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if provider == nil { - t.Fatal("expected non-nil provider") - } - if provider.ProviderName() != "openai" { - t.Errorf("expected provider name 'openai', got %q", provider.ProviderName()) - } - }) - - t.Run("missing API key", func(t *testing.T) { - cfg := Config{ - Provider: "openai", - Model: "text-embedding-3-small", - } - - _, err := NewProvider(cfg) - if err == nil { - t.Fatal("expected error for missing API key") - } - }) -} - -func TestNewProvider_Ollama(t *testing.T) { - t.Run("valid config", func(t *testing.T) { - cfg := Config{ - Provider: "ollama", - Model: "nomic-embed-text", - OllamaURL: "http://localhost:11434", - } - - provider, err := NewProvider(cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if provider == nil { - t.Fatal("expected non-nil provider") - } - if provider.ProviderName() != "ollama" { - t.Errorf("expected provider name 'ollama', got %q", provider.ProviderName()) - } - }) - - t.Run("with defaults", func(t *testing.T) { - cfg := Config{ - Provider: "ollama", - } - - provider, err := NewProvider(cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if provider == nil { - t.Fatal("expected non-nil provider") - } - // Should use default model - if provider.ModelName() != "nomic-embed-text" { - t.Errorf("expected default model 'nomic-embed-text', got %q", provider.ModelName()) - } - }) -} - -func TestNewProvider_Unsupported(t *testing.T) { - cfg := Config{ - Provider: "unsupported", - } - - _, err := NewProvider(cfg) - if err == nil { - t.Fatal("expected error for unsupported provider") - } - if err.Error() != "unsupported embedding provider: unsupported (supported: voyage, openai, ollama)" { - t.Errorf("unexpected error message: %v", err) - } -} - -func TestConfigStruct(t *testing.T) { - cfg := Config{ - Provider: "voyage", - Model: "voyage-3", - VoyageAPIKey: "voyage-key", - OpenAIAPIKey: "openai-key", - OllamaURL: "http://localhost:11434", - } - - if cfg.Provider != "voyage" { - t.Errorf("expected provider 'voyage', got %q", cfg.Provider) - } - if cfg.Model != "voyage-3" { - t.Errorf("expected model 'voyage-3', got %q", cfg.Model) - } - if cfg.VoyageAPIKey != "voyage-key" { - t.Errorf("expected VoyageAPIKey 'voyage-key', got %q", cfg.VoyageAPIKey) - } - if cfg.OpenAIAPIKey != "openai-key" { - t.Errorf("expected OpenAIAPIKey 'openai-key', got %q", cfg.OpenAIAPIKey) - } - if cfg.OllamaURL != "http://localhost:11434" { - t.Errorf("expected OllamaURL 'http://localhost:11434', got %q", cfg.OllamaURL) - } -} From d0c5f119f2e4fac0ac40af94370e5c9046132e11 Mon Sep 17 00:00:00 2001 From: Dave Page Date: Wed, 27 May 2026 19:14:57 +0100 Subject: [PATCH 05/10] web: stream chat responses via /api/llm/v1/chat/stream (SSE) Adds an sseChat helper that POSTs to the library proxy's streaming endpoint, parses Server-Sent Events, and assembles the final response into the same shape the non-streaming /v1/chat endpoint returns. Text chunks update the assistant message bubble incrementally as they arrive. The agentic loop (chat -> tool_use -> tool_result -> chat ...) keeps working unchanged: sseChat returns once event: done arrives, the caller inspects stop_reason and dispatches the next iteration. Message.jsx now renders streamed partial text while keeping a thinking indicator visible, so the user sees progress as chunks arrive instead of just a spinner. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/pgedge-pg-mcp-svr/main.go | 2 +- web/src/components/ChatInterface.jsx | 174 +++++++++-------- web/src/components/Message.jsx | 29 ++- web/src/utils/sseChat.js | 268 +++++++++++++++++++++++++++ web/src/utils/sseChat.test.js | 164 ++++++++++++++++ 5 files changed, 559 insertions(+), 78 deletions(-) create mode 100644 web/src/utils/sseChat.js create mode 100644 web/src/utils/sseChat.test.js diff --git a/cmd/pgedge-pg-mcp-svr/main.go b/cmd/pgedge-pg-mcp-svr/main.go index e9c2f9a..4570185 100644 --- a/cmd/pgedge-pg-mcp-svr/main.go +++ b/cmd/pgedge-pg-mcp-svr/main.go @@ -913,7 +913,7 @@ func main() { if len(providers) == 0 { return fmt.Errorf("LLM is enabled but no provider is configured; " + - "set at least one of anthropic_api_key, openai_api_key, or ollama_url") + "set at least one of anthropic_api_key, openai_api_key, gemini_api_key, or ollama_url") } p := proxy.New(proxy.Config{ diff --git a/web/src/components/ChatInterface.jsx b/web/src/components/ChatInterface.jsx index 44aa87c..002bcec 100644 --- a/web/src/components/ChatInterface.jsx +++ b/web/src/components/ChatInterface.jsx @@ -23,6 +23,7 @@ import ProviderSelector from './ProviderSelector'; import PromptPopover from './PromptPopover'; import WriteQueryConfirmDialog from './WriteQueryConfirmDialog'; import { isWriteQuery } from '../utils/queryClassify'; +import { sseChat } from '../utils/sseChat'; const MAX_AGENTIC_LOOPS = 50; // Compact if estimated tokens exceed this threshold. @@ -769,54 +770,78 @@ const ChatInterface = ({ conversations }) => { }); } - // Call LLM with compacted history. The proxy now lives - // under /api/llm/v1/* and always returns token usage in - // the response body (no `debug` flag is needed). - const llmResponse = await fetch('/api/llm/v1/chat', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${sessionToken}`, - }, - credentials: 'include', - signal: abortController.signal, - body: JSON.stringify({ + // Call LLM with compacted history via the streaming + // endpoint. The proxy lives under /api/llm/v1/* and + // always returns token usage on the done frame. Text + // chunks arrive incrementally and update the assistant + // bubble in real time; tool_use blocks are assembled + // by sseChat into the same shape /v1/chat returns so + // the agentic loop below is unchanged. + let streamedText = ''; + let llmData; + try { + llmData = await sseChat({ messages: compactedMessages, tools: toProxyTools(tools), provider: llmProviders.selectedProvider, model: llmProviders.selectedModel, - }), - }); - - // Handle session invalidation - if (llmResponse.status === 401) { - console.log('Session invalidated, logging out...'); - // Convert thinking message to error message before logout - setMessages(prev => { - const newMessages = [...prev]; - if (newMessages.length > 0 && newMessages[newMessages.length - 1].isThinking) { - const thinkingMsg = newMessages[newMessages.length - 1]; - newMessages[newMessages.length - 1] = { - role: 'assistant', - content: 'Error: Your session has expired. Please log in again.', - timestamp: new Date().toISOString(), - provider: thinkingMsg.provider, - model: thinkingMsg.model, - activity: thinkingMsg.activity || [], - isError: true - }; - } - return newMessages; + }, { + signal: abortController.signal, + sessionToken, + onTextChunk: (chunk) => { + if (!chunk) return; + streamedText += chunk; + // Stream the partial text into the thinking + // bubble so the user sees progress as it + // arrives. We keep isThinking=true so the + // indicator and activity panel keep working + // until the final response is committed. + setMessages(prev => { + const newMessages = [...prev]; + const last = newMessages[newMessages.length - 1]; + if (last && last.isThinking) { + newMessages[newMessages.length - 1] = { + ...last, + content: streamedText, + }; + } + return newMessages; + }); + }, }); - forceLogout(); - return; - } + } catch (streamErr) { + // Re-raise AbortError so the outer handler converts + // it to a "Request cancelled" message. + if (streamErr.name === 'AbortError') { + throw streamErr; + } - if (!llmResponse.ok) { - const errorText = await llmResponse.text(); + // Session expired. + if (streamErr.status === 401) { + console.log('Session invalidated, logging out...'); + setMessages(prev => { + const newMessages = [...prev]; + if (newMessages.length > 0 && newMessages[newMessages.length - 1].isThinking) { + const thinkingMsg = newMessages[newMessages.length - 1]; + newMessages[newMessages.length - 1] = { + role: 'assistant', + content: 'Error: Your session has expired. Please log in again.', + timestamp: new Date().toISOString(), + provider: thinkingMsg.provider, + model: thinkingMsg.model, + activity: thinkingMsg.activity || [], + isError: true + }; + } + return newMessages; + }); + forceLogout(); + return; + } - // Check for rate limit error - if (isRateLimitError(llmResponse.status, errorText)) { + // Rate limit handling mirrors the non-streaming path. + const errorText = streamErr.body || streamErr.message || ''; + if (isRateLimitError(streamErr.status, errorText)) { rateLimitRetryCount++; const rateLimitDetails = parseRateLimitError(errorText); const estimatedTokens = estimateTotalTokens(compactedMessages); @@ -828,7 +853,6 @@ const ChatInterface = ({ conversations }) => { console.log('[Rate Limit] First hit, pausing for 60 seconds before retry...'); console.log(`[Rate Limit] Cumulative tokens in last minute: ${cumulativeTokens}, requests: ${requestCount}`); - // Add rate limit activity activity.push({ type: 'rate_limit_pause', timestamp: new Date().toISOString(), @@ -838,7 +862,6 @@ const ChatInterface = ({ conversations }) => { requestCount: requestCount, }); - // Update thinking message to show we're waiting setMessages(prev => { const newMessages = [...prev]; if (newMessages.length > 0 && newMessages[newMessages.length - 1].isThinking) { @@ -850,14 +873,10 @@ const ChatInterface = ({ conversations }) => { return newMessages; }); - // Wait 60 seconds await delay(RATE_LIMIT_RETRY_DELAY_MS); - - // Don't increment loopCount for rate limit retries loopCount--; continue; } else { - // Second rate limit hit - give up with friendly message const tokenInfo = cumulativeTokens > 0 ? `Tokens used in last minute: ~${cumulativeTokens.toLocaleString()} (${requestCount} requests)` : `Estimated tokens in this request: ~${estimatedTokens.toLocaleString()}`; @@ -871,11 +890,9 @@ const ChatInterface = ({ conversations }) => { } } - throw new Error(`LLM request failed: ${llmResponse.status} ${errorText}`); + throw new Error(`LLM request failed: ${streamErr.message}`); } - const llmData = await llmResponse.json(); - // Track token usage for rate limit awareness. The proxy // now returns usage under the `usage` key (was // `token_usage`); fall back for safety. @@ -1282,31 +1299,44 @@ const ChatInterface = ({ conversations }) => { }); } - // Make LLM request with compacted history. The proxy - // always returns token usage (no `debug` flag is needed). - const response = await fetch('/api/llm/v1/chat', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${sessionToken}`, - }, - body: JSON.stringify({ + // Make LLM request with compacted history via the + // streaming endpoint. Text chunks update the thinking + // bubble as they arrive; tool_use blocks are assembled + // by sseChat into the same shape /v1/chat returns. + let streamedText = ''; + let llmData; + try { + llmData = await sseChat({ messages: compactedMessages, tools: toProxyTools(tools), provider: llmProviders.selectedProvider, model: llmProviders.selectedModel, - }), - }); - - if (!response.ok) { - if (response.status === 401) { + }, { + sessionToken, + onTextChunk: (chunk) => { + if (!chunk) return; + streamedText += chunk; + setMessages(prev => { + const newMessages = [...prev]; + const last = newMessages[newMessages.length - 1]; + if (last && last.isThinking) { + newMessages[newMessages.length - 1] = { + ...last, + content: streamedText, + }; + } + return newMessages; + }); + }, + }); + } catch (streamErr) { + if (streamErr.status === 401) { forceLogout(); throw new Error('Session expired. Please login again.'); } - const errorText = await response.text(); - // Check for rate limit error - if (isRateLimitError(response.status, errorText)) { + const errorText = streamErr.body || streamErr.message || ''; + if (isRateLimitError(streamErr.status, errorText)) { rateLimitRetryCount++; const rateLimitDetails = parseRateLimitError(errorText); const estimatedTokens = estimateTotalTokens(compactedMessages); @@ -1318,7 +1348,6 @@ const ChatInterface = ({ conversations }) => { console.log('[Rate Limit] First hit, pausing for 60 seconds before retry...'); console.log(`[Rate Limit] Cumulative tokens in last minute: ${cumulativeTokens}, requests: ${requestCount}`); - // Add rate limit activity activity.push({ type: 'rate_limit_pause', timestamp: new Date().toISOString(), @@ -1328,7 +1357,6 @@ const ChatInterface = ({ conversations }) => { requestCount: requestCount, }); - // Update thinking message to show we're waiting setMessages(prev => { const newMessages = [...prev]; if (newMessages.length > 0 && newMessages[newMessages.length - 1].isThinking) { @@ -1340,14 +1368,10 @@ const ChatInterface = ({ conversations }) => { return newMessages; }); - // Wait 60 seconds await delay(RATE_LIMIT_RETRY_DELAY_MS); - - // Don't increment loopCount for rate limit retries loopCount--; continue; } else { - // Second rate limit hit - give up with friendly message const tokenInfo = cumulativeTokens > 0 ? `Tokens used in last minute: ~${cumulativeTokens.toLocaleString()} (${requestCount} requests)` : `Estimated tokens in this request: ~${estimatedTokens.toLocaleString()}`; @@ -1361,11 +1385,9 @@ const ChatInterface = ({ conversations }) => { } } - throw new Error(`Server error: ${errorText}`); + throw new Error(`Server error: ${errorText || streamErr.message}`); } - const llmData = await response.json(); - // Track token usage for rate limit awareness. Proxy now // returns usage under `usage`; fall back for safety. const usage = llmData.usage || llmData.token_usage; diff --git a/web/src/components/Message.jsx b/web/src/components/Message.jsx index b2804f2..37c5f48 100644 --- a/web/src/components/Message.jsx +++ b/web/src/components/Message.jsx @@ -300,8 +300,35 @@ const Message = React.memo(({ message, showActivity, renderMarkdown, debug }) => > {message.content} - ) : message.isThinking ? ( + ) : message.isThinking && !message.content ? ( + ) : message.isThinking && message.content ? ( + // Streaming in progress: render the partial + // text and keep the thinking indicator below + // so the user sees progress is ongoing. + <> + {renderMarkdown ? ( + + {message.content} + + ) : ( + + {message.content} + + )} + + + + ) : renderMarkdown ? ( finalises the assembled response (carries stop_reason + * and usage). + * - "error" -> aborts the stream and rejects the returned promise. + * + * Chunk types within "message" events: + * + * - "text" -> appended to the current text block; also + * surfaced via the onTextChunk callback so the + * UI can update incrementally. + * - "tool_use_start" -> begins a new tool_use block (id + name). + * - "tool_use_delta" -> accumulates partial JSON input string for + * the current tool_use; parsed at done. + * + * @param {object} body - Request body matching the /v1/chat schema + * (messages, tools, provider, model, etc.). + * @param {object} [options] - Optional knobs. + * @param {AbortSignal} [options.signal] - Abort signal forwarded to + * fetch. + * @param {string} [options.sessionToken] - Bearer token used for the + * Authorization header. + * @param {Function} [options.onTextChunk] - Called with each text + * fragment (string) as it arrives, suitable for incremental UI + * updates. + * @returns {Promise} Resolves with the assembled + * { content, stop_reason, usage } response. + */ +export async function sseChat(body, options = {}) { + const { signal, sessionToken, onTextChunk } = options; + + const headers = { + 'Content-Type': 'application/json', + 'Accept': 'text/event-stream', + }; + if (sessionToken) { + headers['Authorization'] = `Bearer ${sessionToken}`; + } + + const response = await fetch('/api/llm/v1/chat/stream', { + method: 'POST', + headers, + credentials: 'include', + signal, + body: JSON.stringify(body), + }); + + if (!response.ok) { + const text = await response.text(); + const err = new Error(`HTTP ${response.status}: ${text}`); + err.status = response.status; + err.body = text; + throw err; + } + + if (!response.body || typeof response.body.getReader !== 'function') { + throw new Error('Streaming response body is not readable'); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + // Assembly state mirrors the non-streaming endpoint's response. + const assembled = { + content: [], + stop_reason: 'end_turn', + usage: null, + }; + let pendingTextBlock = null; + // Ordered list of tool_use ids so we preserve emission order at done. + const toolOrder = []; + // Map of tool_use id -> { name, partial } accumulator. + const pendingTools = new Map(); + let currentToolId = null; + let streamError = null; + + const flushPendingText = () => { + if (pendingTextBlock) { + assembled.content.push(pendingTextBlock); + pendingTextBlock = null; + } + }; + + const finalise = () => { + flushPendingText(); + for (const id of toolOrder) { + const info = pendingTools.get(id); + if (!info) continue; + let input = {}; + const partial = info.partial || ''; + if (partial.trim().length > 0) { + try { + input = JSON.parse(partial); + } catch (_err) { + // Leave input as the raw partial string so the + // caller can still inspect what was attempted. + input = { _raw: partial }; + } + } + assembled.content.push({ + type: 'tool_use', + tool_use: { id, name: info.name, input }, + }); + } + }; + + const handleEvent = (eventType, dataLines) => { + if (dataLines.length === 0) return; + const dataStr = dataLines.join('\n'); + let parsed; + try { + parsed = JSON.parse(dataStr); + } catch (_err) { + // Ignore malformed payloads; the server may emit comments + // or heartbeats we don't recognise. + return; + } + + if (eventType === 'done') { + if (parsed.stop_reason) { + assembled.stop_reason = parsed.stop_reason; + } + if (parsed.usage) { + assembled.usage = parsed.usage; + } + // If the done payload also carries assembled content + // blocks, prefer the server's view. + if (Array.isArray(parsed.content) && parsed.content.length > 0) { + assembled.content = parsed.content; + // Don't run finalise() in this case; the server already + // provided the assembled shape. + pendingTextBlock = null; + pendingTools.clear(); + toolOrder.length = 0; + } else { + finalise(); + } + return; + } + + if (eventType === 'error') { + const msg = parsed.error || parsed.message || 'stream error'; + streamError = new Error(msg); + return; + } + + // Default "message" events carry chunk payloads. + switch (parsed.type) { + case 'text': { + if (!pendingTextBlock) { + pendingTextBlock = { type: 'text', text: '' }; + } + const chunk = parsed.text || ''; + pendingTextBlock.text += chunk; + if (chunk && typeof onTextChunk === 'function') { + try { + onTextChunk(chunk); + } catch (_err) { + // Don't let UI callbacks abort the stream. + } + } + break; + } + case 'tool_use_start': { + // Flush any pending text so the assembled content + // preserves the relative ordering of blocks. + flushPendingText(); + const tu = parsed.tool_use || {}; + const id = tu.id || `tu_${pendingTools.size}`; + currentToolId = id; + if (!pendingTools.has(id)) { + toolOrder.push(id); + } + pendingTools.set(id, { + name: tu.name || '', + partial: '', + }); + break; + } + case 'tool_use_delta': { + const id = parsed.id || currentToolId; + if (id && pendingTools.has(id)) { + const info = pendingTools.get(id); + info.partial += parsed.partial || ''; + } + break; + } + default: + // Other chunk types (image, etc.) ignored for now. + break; + } + }; + + const processFrame = (frame) => { + let eventType = 'message'; + const dataLines = []; + for (const rawLine of frame.split('\n')) { + const line = rawLine.replace(/\r$/, ''); + if (line.length === 0) continue; + if (line.startsWith(':')) continue; // SSE comment + if (line.startsWith('event:')) { + eventType = line.slice(6).trim(); + } else if (line.startsWith('data:')) { + // Per SSE spec, strip a single leading space if present. + let payload = line.slice(5); + if (payload.startsWith(' ')) payload = payload.slice(1); + dataLines.push(payload); + } + // Other fields (id:, retry:) are ignored. + } + handleEvent(eventType, dataLines); + }; + + try { + while (true) { + const { value, done } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + let idx; + while ((idx = buffer.indexOf('\n\n')) !== -1) { + const frame = buffer.slice(0, idx); + buffer = buffer.slice(idx + 2); + processFrame(frame); + if (streamError) break; + } + if (streamError) break; + } + // Flush any trailing data that wasn't terminated by \n\n. + if (!streamError && buffer.trim().length > 0) { + processFrame(buffer); + buffer = ''; + } + } finally { + try { + reader.releaseLock(); + } catch (_err) { + // ignore + } + } + + if (streamError) { + throw streamError; + } + + return assembled; +} + +export default sseChat; diff --git a/web/src/utils/sseChat.test.js b/web/src/utils/sseChat.test.js new file mode 100644 index 0000000..5594b8a --- /dev/null +++ b/web/src/utils/sseChat.test.js @@ -0,0 +1,164 @@ +/*------------------------------------------------------------------------- + * + * pgEdge MCP Client - SSE Chat Streaming Helper Tests + * + * Copyright (c) 2025 - 2026, pgEdge, Inc. + * This software is released under The PostgreSQL License + * + *------------------------------------------------------------------------- + */ + +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { sseChat } from './sseChat'; + +/** + * Builds a Response-like object whose body is a ReadableStream that + * yields the supplied byte chunks. Use this to fake fetch() returning + * a server-sent-event stream. + * + * @param {string[]} chunks - Strings to emit in order; each becomes a + * Uint8Array enqueued onto the stream. + * @param {object} [opts] - Optional overrides (status, statusText, ok). + * @returns {object} - Response-like object. + */ +const buildStreamResponse = (chunks, opts = {}) => { + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + for (const chunk of chunks) { + controller.enqueue(encoder.encode(chunk)); + } + controller.close(); + }, + }); + return { + ok: opts.ok ?? true, + status: opts.status ?? 200, + statusText: opts.statusText ?? 'OK', + body: stream, + text: async () => opts.text ?? '', + }; +}; + +describe('sseChat', () => { + let originalFetch; + + beforeEach(() => { + originalFetch = globalThis.fetch; + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + vi.restoreAllMocks(); + }); + + it('assembles text chunks into a single text block', async () => { + globalThis.fetch = vi.fn().mockResolvedValue(buildStreamResponse([ + 'data: {"type":"text","text":"Hello"}\n\n', + 'data: {"type":"text","text":" world"}\n\n', + 'event: done\ndata: {"stop_reason":"end_turn","usage":{"prompt_tokens":5,"completion_tokens":2,"total_tokens":7}}\n\n', + ])); + + const seen = []; + const result = await sseChat( + { messages: [] }, + { onTextChunk: (c) => seen.push(c) }, + ); + + expect(seen).toEqual(['Hello', ' world']); + expect(result.stop_reason).toBe('end_turn'); + expect(result.usage).toEqual({ + prompt_tokens: 5, + completion_tokens: 2, + total_tokens: 7, + }); + expect(result.content).toEqual([ + { type: 'text', text: 'Hello world' }, + ]); + }); + + it('assembles tool_use_start + tool_use_delta into a tool_use block', async () => { + globalThis.fetch = vi.fn().mockResolvedValue(buildStreamResponse([ + 'data: {"type":"text","text":"Looking up weather."}\n\n', + 'data: {"type":"tool_use_start","tool_use":{"id":"tu_1","name":"get_weather","input":null}}\n\n', + 'data: {"type":"tool_use_delta","partial":"{\\"city\\":"}\n\n', + 'data: {"type":"tool_use_delta","partial":"\\"London\\"}"}\n\n', + 'event: done\ndata: {"stop_reason":"tool_use","usage":{"prompt_tokens":10,"completion_tokens":5,"total_tokens":15}}\n\n', + ])); + + const result = await sseChat({ messages: [] }); + + expect(result.stop_reason).toBe('tool_use'); + expect(result.content).toEqual([ + { type: 'text', text: 'Looking up weather.' }, + { + type: 'tool_use', + tool_use: { + id: 'tu_1', + name: 'get_weather', + input: { city: 'London' }, + }, + }, + ]); + }); + + it('throws when the server emits an error event', async () => { + globalThis.fetch = vi.fn().mockResolvedValue(buildStreamResponse([ + 'data: {"type":"text","text":"partial"}\n\n', + 'event: error\ndata: {"error":"upstream timeout"}\n\n', + ])); + + await expect(sseChat({ messages: [] })).rejects.toThrow('upstream timeout'); + }); + + it('throws when the response is not ok', async () => { + globalThis.fetch = vi.fn().mockResolvedValue({ + ok: false, + status: 500, + statusText: 'Internal Server Error', + body: null, + text: async () => 'boom', + }); + + await expect(sseChat({ messages: [] })).rejects.toThrow(/HTTP 500/); + }); + + it('forwards Authorization header when sessionToken is provided', async () => { + const fetchMock = vi.fn().mockResolvedValue(buildStreamResponse([ + 'event: done\ndata: {"stop_reason":"end_turn"}\n\n', + ])); + globalThis.fetch = fetchMock; + + await sseChat({ messages: [] }, { sessionToken: 'abc123' }); + + expect(fetchMock).toHaveBeenCalledTimes(1); + const [url, init] = fetchMock.mock.calls[0]; + expect(url).toBe('/api/llm/v1/chat/stream'); + expect(init.method).toBe('POST'); + expect(init.headers['Authorization']).toBe('Bearer abc123'); + expect(init.headers['Accept']).toBe('text/event-stream'); + expect(init.headers['Content-Type']).toBe('application/json'); + }); + + it('handles frames split across multiple network chunks', async () => { + globalThis.fetch = vi.fn().mockResolvedValue(buildStreamResponse([ + 'data: {"type":"text","text":"He', + 'llo"}\n\ndata: {"type":"text","text":" world"}\n', + '\nevent: done\ndata: {"stop_reason":"end_turn"}\n\n', + ])); + + const result = await sseChat({ messages: [] }); + expect(result.content).toEqual([{ type: 'text', text: 'Hello world' }]); + expect(result.stop_reason).toBe('end_turn'); + }); + + it('prefers content array from done payload when present', async () => { + globalThis.fetch = vi.fn().mockResolvedValue(buildStreamResponse([ + 'data: {"type":"text","text":"streamed"}\n\n', + 'event: done\ndata: {"stop_reason":"end_turn","content":[{"type":"text","text":"final"}]}\n\n', + ])); + + const result = await sseChat({ messages: [] }); + expect(result.content).toEqual([{ type: 'text', text: 'final' }]); + }); +}); From 935ed260eb4cb04c524f68422b481fe1707552f6 Mon Sep 17 00:00:00 2001 From: Dave Page Date: Wed, 27 May 2026 19:23:27 +0100 Subject: [PATCH 06/10] docs: changelog entries for streaming, Gemini, and embedding cleanup (PR 4) Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/changelog.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/docs/changelog.md b/docs/changelog.md index 42a9691..e3d17c8 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -63,6 +63,21 @@ and this project adheres to exposed alongside the non-streaming endpoint. The non-streaming `/v1/chat` endpoint remains available for callers that prefer it. +- The web chat interface now consumes the streaming endpoint + `/api/llm/v1/chat/stream` (Server-Sent Events) and renders the + assistant response incrementally as chunks arrive. The + non-streaming endpoint stays available for callers that prefer + it. A new `web/src/utils/sseChat.js` helper handles the SSE + parsing and assembles the final response into the same shape + the non-streaming endpoint returns, so the agentic chat loop is + unchanged. + +- The tools `search_knowledgebase`, `generate_embedding`, and + `similarity_search` now construct their embedding client + directly via `llm.NewClient` rather than going through the old + `embedding.NewProvider` wrapper. The `internal/embedding/` + package is deleted entirely. + ### Fixed - Metadata loader now tolerates tables with zero columns @@ -126,6 +141,12 @@ and this project adheres to token and needing access to multiple databases). The setting is honored by both the single-database and multi-database initialization paths. (#167) + +- Google Gemini is now a supported LLM provider. Configure via + `gemini_api_key` / `gemini_api_key_file` in the config file or + via the `PGEDGE_GEMINI_API_KEY` / `GEMINI_API_KEY` environment + variables. + ### Fixed - Fixed port detection on Windows; the installer now reliably From a20c4d3065a66f55dbb95befccdb4d617ad81c3e Mon Sep 17 00:00:00 2001 From: Dave Page Date: Wed, 27 May 2026 19:32:55 +0100 Subject: [PATCH 07/10] tools: rephrase embedding-config error messages for lint compliance staticcheck (ST1005) rejects error strings that begin with a capital letter. The two missing-API-key messages copied across from the old embedding factory both started with the provider name. Switch to a "missing for embedding provider ''" phrasing that starts with a lowercase verb. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/changelog.md | 1 + internal/tools/embed_client.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/changelog.md b/docs/changelog.md index e3d17c8..4d5c0f8 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -26,6 +26,7 @@ and this project adheres to - OpenAI models that require the Responses API (`gpt-5-*`, `o1-*`, `o3-*`) are now supported transparently; the library routes them to `/v1/responses` automatically based on the model name. + - Embedding provider clients (Voyage, OpenAI, Ollama) now use the shared [`pgedge-go-llm-lib`](https://github.com/pgEdge/pgedge-go-llm-lib) diff --git a/internal/tools/embed_client.go b/internal/tools/embed_client.go index de9539f..f919a52 100644 --- a/internal/tools/embed_client.go +++ b/internal/tools/embed_client.go @@ -42,7 +42,7 @@ func newEmbedClient(cfg embedClientConfig) (llm.Client, string, error) { switch cfg.Provider { case "voyage": if cfg.VoyageAPIKey == "" { - return nil, "", fmt.Errorf("Voyage AI API key is required when provider is 'voyage'") + return nil, "", fmt.Errorf("missing Voyage AI API key for embedding provider 'voyage'") } model := cfg.Model if model == "" { @@ -55,7 +55,7 @@ func newEmbedClient(cfg embedClientConfig) (llm.Client, string, error) { } case "openai": if cfg.OpenAIAPIKey == "" { - return nil, "", fmt.Errorf("OpenAI API key is required when provider is 'openai'") + return nil, "", fmt.Errorf("missing OpenAI API key for embedding provider 'openai'") } opts = llm.Options{ APIKey: cfg.OpenAIAPIKey, From e058da4c767cd585e0c5e82b98dc2dabfb4c245e Mon Sep 17 00:00:00 2001 From: Dave Page Date: Thu, 28 May 2026 09:36:55 +0100 Subject: [PATCH 08/10] web: add AbortController to handlePromptExecute sseChat call handlePromptExecute previously had no abort handling, so the streaming sseChat call could not be cancelled by the user. Mirror the AbortController pattern already used in handleSend: create a controller, pass its signal to sseChat, treat AbortError as a benign cancellation in the catch block. Co-Authored-By: Claude Opus 4.7 (1M context) --- web/src/components/ChatInterface.jsx | 39 ++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/web/src/components/ChatInterface.jsx b/web/src/components/ChatInterface.jsx index 002bcec..4bb8fd8 100644 --- a/web/src/components/ChatInterface.jsx +++ b/web/src/components/ChatInterface.jsx @@ -1202,6 +1202,14 @@ const ChatInterface = ({ conversations }) => { setExecutingPrompt(true); + // Create AbortController for this request so the existing + // cancel button (wired to handleCancel via abortControllerRef) + // can terminate an in-flight streaming response. Share the ref + // with handleSend; only one flow can be active at a time + // because both gate on the `loading` flag. + const abortController = new AbortController(); + abortControllerRef.current = abortController; + try { // Get the prompt with arguments from MCP server const promptResult = await mcpClient.getPrompt(promptName, args); @@ -1312,6 +1320,7 @@ const ChatInterface = ({ conversations }) => { provider: llmProviders.selectedProvider, model: llmProviders.selectedModel, }, { + signal: abortController.signal, sessionToken, onTextChunk: (chunk) => { if (!chunk) return; @@ -1330,6 +1339,12 @@ const ChatInterface = ({ conversations }) => { }, }); } catch (streamErr) { + // Re-raise AbortError so the outer handler converts + // it to a "Request cancelled" message. + if (streamErr.name === 'AbortError') { + throw streamErr; + } + if (streamErr.status === 401) { forceLogout(); throw new Error('Session expired. Please login again.'); @@ -1557,6 +1572,29 @@ const ChatInterface = ({ conversations }) => { throw new Error('Maximum tool execution loops reached'); } } catch (err) { + // Check if this was a user cancellation + if (err.name === 'AbortError') { + console.log('Prompt execution cancelled by user'); + // Convert thinking message to cancelled message + setMessages(prev => { + const newMessages = [...prev]; + if (newMessages.length > 0 && newMessages[newMessages.length - 1].isThinking) { + const thinkingMsg = newMessages[newMessages.length - 1]; + newMessages[newMessages.length - 1] = { + role: 'assistant', + content: 'Request cancelled', + timestamp: new Date().toISOString(), + provider: thinkingMsg.provider, + model: thinkingMsg.model, + activity: thinkingMsg.activity || [], + isCancelled: true + }; + } + return newMessages; + }); + return; + } + console.error('Prompt execution error:', err); // Convert thinking message to error message (preserve activity for debugging) @@ -1580,6 +1618,7 @@ const ChatInterface = ({ conversations }) => { } finally { setExecutingPrompt(false); setLoading(false); + abortControllerRef.current = null; } }, [mcpClient, loading, messages, sessionToken, tools, llmProviders.selectedProvider, llmProviders.selectedModel, forceLogout, refreshTools, fetchDatabases, isWriteAccessEnabled, requestWriteConfirmation]); From 3f6070ff8c8a43d1ea14abfba50d0405a2c32203 Mon Sep 17 00:00:00 2001 From: Dave Page Date: Thu, 28 May 2026 09:37:27 +0100 Subject: [PATCH 09/10] config+tools: address CodeRabbit feedback on PR #175 - mergeConfig() now copies the Gemini API key and key-file fields alongside the other LLM provider fields. Without this, a gemini_api_key in the YAML config would be silently dropped at load time. - newEmbedClient() normalises cfg.Provider via TrimSpace+ToLower before the switch and llm.NewClient call. Inputs like "OpenAI" or " openai " are now accepted; the error message and llm.NewClient receive the normalised value. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/config/config.go | 6 ++++++ internal/tools/embed_client.go | 10 ++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index b741e2e..3cf58a5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -752,6 +752,12 @@ func mergeConfig(dest, src *Config) { if src.LLM.OllamaURL != "" { dest.LLM.OllamaURL = src.LLM.OllamaURL } + if src.LLM.GeminiAPIKey != "" { + dest.LLM.GeminiAPIKey = src.LLM.GeminiAPIKey + } + if src.LLM.GeminiAPIKeyFile != "" { + dest.LLM.GeminiAPIKeyFile = src.LLM.GeminiAPIKeyFile + } if src.LLM.MaxTokens != 0 { dest.LLM.MaxTokens = src.LLM.MaxTokens } diff --git a/internal/tools/embed_client.go b/internal/tools/embed_client.go index f919a52..842866b 100644 --- a/internal/tools/embed_client.go +++ b/internal/tools/embed_client.go @@ -12,6 +12,7 @@ package tools import ( "fmt" + "strings" "github.com/pgEdge/pgedge-go-llm-lib/llm" _ "github.com/pgEdge/pgedge-go-llm-lib/llm/provider/ollama" @@ -38,8 +39,9 @@ type embedClientConfig struct { // and the resolved model name (after defaults are applied) so callers // can display it without re-deriving the default logic. func newEmbedClient(cfg embedClientConfig) (llm.Client, string, error) { + provider := strings.ToLower(strings.TrimSpace(cfg.Provider)) var opts llm.Options - switch cfg.Provider { + switch provider { case "voyage": if cfg.VoyageAPIKey == "" { return nil, "", fmt.Errorf("missing Voyage AI API key for embedding provider 'voyage'") @@ -78,13 +80,13 @@ func newEmbedClient(cfg embedClientConfig) (llm.Client, string, error) { default: return nil, "", fmt.Errorf( "unsupported embedding provider: %s (supported: voyage, openai, ollama)", - cfg.Provider, + provider, ) } - client, err := llm.NewClient(cfg.Provider, opts) + client, err := llm.NewClient(provider, opts) if err != nil { - return nil, "", fmt.Errorf("create %s embedding client: %w", cfg.Provider, err) + return nil, "", fmt.Errorf("create %s embedding client: %w", provider, err) } return client, opts.Model, nil } From 4e67f52d4e4905c208bc798654dcb4ae7fd7d5e7 Mon Sep 17 00:00:00 2001 From: Dave Page Date: Fri, 12 Jun 2026 09:46:40 +0100 Subject: [PATCH 10/10] build(deps): pin pgEdge go-llm-lib to v0.1.0 Bump the dependency from the 28 May pseudo-version to the tagged v0.1.0 release. The breaking changes in v0.1.0 (variadic ListModels, the new Rerank/EmbedMultimodal interface methods, and the snake_case ToolChoice proxy wire format) do not affect this code: we consume llm.Client rather than implementing it, and we do not send tool_choice through the proxy. --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index b9685f8..b11bbf7 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/fsnotify/fsnotify v1.9.0 github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.6 - github.com/pgEdge/pgedge-go-llm-lib v0.0.0-20260528080856-17b5f11c6044 + github.com/pgEdge/pgedge-go-llm-lib v0.1.0 golang.org/x/crypto v0.49.0 golang.org/x/term v0.41.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index 6442ce9..5dfbb7a 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,8 @@ github.com/muesli/termenv v0.16.0 h1:S5AlUN9dENB57rsbnkPyfdGuWIlkmzJjbFf0Tf5FWUc github.com/muesli/termenv v0.16.0/go.mod h1:ZRfOIKPFDYQoDFF4Olj7/QJbW60Ol/kL1pU3VfY/Cnk= github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= -github.com/pgEdge/pgedge-go-llm-lib v0.0.0-20260528080856-17b5f11c6044 h1:VvcBNVFKWnVoLdJfZQhXkE42sefQWhNTeB8mPYU5U/k= -github.com/pgEdge/pgedge-go-llm-lib v0.0.0-20260528080856-17b5f11c6044/go.mod h1:41rtSLcp/wwSUlBqetVHLQKisDZfzBmgSWt84WA+Eys= +github.com/pgEdge/pgedge-go-llm-lib v0.1.0 h1:IiCWA99un19rwdB1hlDPOm2Ft+43LsCvp0oAhbBM/Nk= +github.com/pgEdge/pgedge-go-llm-lib v0.1.0/go.mod h1:41rtSLcp/wwSUlBqetVHLQKisDZfzBmgSWt84WA+Eys= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=