Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,17 @@ export function getStatefulMarkerAndIndex(modelId: string, messages: readonly Ra
}
}
return undefined;
}
}

/**
* Finds the message index of a specific stateful marker value in the message history.
* Returns the index if found, undefined otherwise.
*/
export function getIndexOfStatefulMarker(markerValue: string, messages: readonly Raw.ChatMessage[]): number | undefined {
for (const entry of getAllStatefulMarkersAndIndicies(messages)) {
if (entry.statefulMarker.marker === markerValue) {
return entry.index;
}
}
return undefined;
}
40 changes: 31 additions & 9 deletions extensions/copilot/src/platform/endpoint/node/responsesApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import { TelemetryData } from '../../telemetry/common/telemetryData';
import { getVerbosityForModelSync } from '../common/chatModelCapabilities';
import { rawPartAsCompactionData } from '../common/compactionDataContainer';
import { rawPartAsPhaseData } from '../common/phaseDataContainer';
import { getStatefulMarkerAndIndex } from '../common/statefulMarkerContainer';
import { getIndexOfStatefulMarker, getStatefulMarkerAndIndex } from '../common/statefulMarkerContainer';
import { rawPartAsThinkingData } from '../common/thinkingDataContainer';
import { IChatWebSocketManager } from '../../networking/node/chatWebSocketManager';

export function getResponsesApiCompactionThreshold(configService: IConfigurationService, expService: IExperimentationService, endpoint: IChatEndpoint): number | undefined {
const contextManagementEnabled = configService.getExperimentBasedConfig(ConfigKey.ResponsesApiContextManagementEnabled, expService) && !modelsWithoutResponsesContextManagement.has(endpoint.family);
Expand All @@ -49,7 +50,7 @@ export function createResponsesRequestBody(accessor: ServicesAccessor, options:

const body: IEndpointBody = {
model,
...rawMessagesToResponseAPI(model, options.messages, !!options.ignoreStatefulMarker),
...rawMessagesToResponseAPI(model, options.messages, !!options.ignoreStatefulMarker, resolveWebSocketStatefulMarker(accessor, options)),
stream: true,
tools: options.requestOptions?.tools?.map((tool): OpenAI.Responses.FunctionTool & OpenAiResponsesFunctionTool => ({
...tool.function,
Expand Down Expand Up @@ -131,22 +132,43 @@ interface LatestCompactionOutput {
readonly outputIndex: number;
}

function rawMessagesToResponseAPI(modelId: string, messages: readonly Raw.ChatMessage[], ignoreStatefulMarker: boolean): { input: OpenAI.Responses.ResponseInputItem[]; previous_response_id?: string } {
function resolveWebSocketStatefulMarker(accessor: ServicesAccessor, options: ICreateEndpointBodyOptions): string | undefined {
if (options.ignoreStatefulMarker || !options.useWebSocket || !options.conversationId) {
return undefined;
}
return accessor.get(IChatWebSocketManager).getStatefulMarker(options.conversationId);
}

function rawMessagesToResponseAPI(modelId: string, messages: readonly Raw.ChatMessage[], ignoreStatefulMarker: boolean, webSocketStatefulMarker: string | undefined): { input: OpenAI.Responses.ResponseInputItem[]; previous_response_id?: string } {
const latestCompactionMessageIndex = getLatestCompactionMessageIndex(messages);
const latestCompactionMessage = latestCompactionMessageIndex !== undefined ? createCompactionRoundTripMessage(messages[latestCompactionMessageIndex]) : undefined;
const statefulMarkerAndIndex = !ignoreStatefulMarker && getStatefulMarkerAndIndex(modelId, messages);

let previousResponseId: string | undefined;
if (statefulMarkerAndIndex) {
previousResponseId = statefulMarkerAndIndex.statefulMarker;
let markerIndex: number | undefined;

if (webSocketStatefulMarker) {
// WebSocket path: use the connection's current stateful marker if present in messages
markerIndex = getIndexOfStatefulMarker(webSocketStatefulMarker, messages);
if (markerIndex !== undefined) {
previousResponseId = webSocketStatefulMarker;
}
} else if (!ignoreStatefulMarker) {
// HTTP path: look up the latest marker for this model from messages
const statefulMarkerAndIndex = getStatefulMarkerAndIndex(modelId, messages);
if (statefulMarkerAndIndex) {
previousResponseId = statefulMarkerAndIndex.statefulMarker;
markerIndex = statefulMarkerAndIndex.index;
}
}

if (markerIndex !== undefined) {
// Requests that resume from previous_response_id send only post-marker history,
// but they still need the latest compaction item even when that item predates
// the marker. This keeps both websocket and non-websocket traffic aligned.
messages = messages.slice(statefulMarkerAndIndex.index + 1);
messages = messages.slice(markerIndex + 1);
if (latestCompactionMessageIndex !== undefined) {
if (latestCompactionMessageIndex > statefulMarkerAndIndex.index) {
messages = messages.slice(latestCompactionMessageIndex - (statefulMarkerAndIndex.index + 1));
if (latestCompactionMessageIndex > markerIndex) {
messages = messages.slice(latestCompactionMessageIndex - (markerIndex + 1));
} else if (latestCompactionMessage) {
messages = [latestCompactionMessage, ...messages];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { ILogService } from '../../../log/common/logService';
import { isOpenAIContextManagementResponse } from '../../../networking/common/fetch';
import { IChatEndpoint, ICreateEndpointBodyOptions } from '../../../networking/common/networking';
import { openAIContextManagementCompactionType, OpenAIContextManagementResponse } from '../../../networking/common/openai';
import { IChatWebSocketManager, NullChatWebSocketManager } from '../../../networking/node/chatWebSocketManager';
import { TelemetryData } from '../../../telemetry/common/telemetryData';
import { SpyingTelemetryService } from '../../../telemetry/node/spyingTelemetryService';
import { createFakeStreamResponse } from '../../../test/node/fetcher';
Expand Down Expand Up @@ -307,6 +308,9 @@ describe('createResponsesRequestBody', () => {

it('still slices websocket requests by stateful marker index when compaction is disabled', () => {
const services = createPlatformServices();
const wsManager = new NullChatWebSocketManager();
wsManager.getStatefulMarker = () => 'resp-prev';
services.set(IChatWebSocketManager, wsManager);
const accessor = services.createTestingAccessor();
const instantiationService = accessor.get(IInstantiationService);
const endpointWithoutCompaction = { ...testEndpoint, family: 'gpt-5' as const };
Expand All @@ -322,7 +326,7 @@ describe('createResponsesRequestBody', () => {
},
];

const webSocketBody = instantiationService.invokeFunction(servicesAccessor => createResponsesRequestBody(servicesAccessor, createRequestOptions(messages, true), endpointWithoutCompaction.model, endpointWithoutCompaction));
const webSocketBody = instantiationService.invokeFunction(servicesAccessor => createResponsesRequestBody(servicesAccessor, { ...createRequestOptions(messages, true), conversationId: 'conv-1' }, endpointWithoutCompaction.model, endpointWithoutCompaction));

expect(webSocketBody.previous_response_id).toBe('resp-prev');
expect(webSocketBody.input).toHaveLength(1);
Expand All @@ -337,6 +341,9 @@ describe('createResponsesRequestBody', () => {

it('includes the newest compaction item in websocket requests when it predates the stateful marker', () => {
const services = createPlatformServices();
const wsManager = new NullChatWebSocketManager();
wsManager.getStatefulMarker = () => 'resp-prev';
services.set(IChatWebSocketManager, wsManager);
const accessor = services.createTestingAccessor();
const instantiationService = accessor.get(IInstantiationService);
const latestCompaction = createCompactionResponse('cmp_ws', 'enc_ws');
Expand All @@ -353,7 +360,7 @@ describe('createResponsesRequestBody', () => {
},
];

const webSocketBody = instantiationService.invokeFunction(servicesAccessor => createResponsesRequestBody(servicesAccessor, createRequestOptions(messages, true), testEndpoint.model, testEndpoint));
const webSocketBody = instantiationService.invokeFunction(servicesAccessor => createResponsesRequestBody(servicesAccessor, { ...createRequestOptions(messages, true), conversationId: 'conv-1' }, testEndpoint.model, testEndpoint));

expect(webSocketBody.previous_response_id).toBe('resp-prev');
expect(webSocketBody.input).toContainEqual({
Expand All @@ -370,6 +377,42 @@ describe('createResponsesRequestBody', () => {
services.dispose();
});

it('sends all messages when the websocket stateful marker is not in the current messages', () => {
const services = createPlatformServices();
const wsManager = new NullChatWebSocketManager();
wsManager.getStatefulMarker = () => 'resp-stale';
services.set(IChatWebSocketManager, wsManager);
const accessor = services.createTestingAccessor();
const instantiationService = accessor.get(IInstantiationService);
const messages: Raw.ChatMessage[] = [
{
role: Raw.ChatRole.User,
content: [{ type: Raw.ChatCompletionContentPartKind.Text, text: 'first message' }],
},
createStatefulMarkerMessage(testEndpoint.model, 'resp-different'),
{
role: Raw.ChatRole.User,
content: [{ type: Raw.ChatCompletionContentPartKind.Text, text: 'second message' }],
},
];

const body = instantiationService.invokeFunction(servicesAccessor => createResponsesRequestBody(servicesAccessor, { ...createRequestOptions(messages, true), conversationId: 'conv-1' }, testEndpoint.model, testEndpoint));

expect(body.previous_response_id).toBeUndefined();
expect(body.input).toHaveLength(2);
expect(body.input?.[0]).toMatchObject({
role: 'user',
content: [{ type: 'input_text', text: 'first message' }],
});
expect(body.input?.[1]).toMatchObject({
role: 'user',
content: [{ type: 'input_text', text: 'second message' }],
});

accessor.dispose();
services.dispose();
});

it('includes the newest compaction item in non-websocket requests when it predates the stateful marker', () => {
const services = createPlatformServices();
const accessor = services.createTestingAccessor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ export interface IChatWebSocketManager {
*/
hasActiveConnection(conversationId: string): boolean;

/**
* Returns the stateful marker (last completed response ID) for the given
* conversation's active WebSocket connection, or undefined if there is
* no active connection or no marker yet.
*/
getStatefulMarker(conversationId: string): string | undefined;

/**
* Closes and removes the connection for a specific conversation.
*/
Expand All @@ -58,6 +65,7 @@ export class NullChatWebSocketManager implements IChatWebSocketManager {
throw new Error('WebSocket not available');
}
hasActiveConnection(_conversationId: string): boolean { return false; }
getStatefulMarker(_conversationId: string): string | undefined { return undefined; }
closeConnection(_conversationId: string): void { }
closeAll(): void { }
}
Expand Down Expand Up @@ -201,6 +209,11 @@ export class ChatWebSocketManager extends Disposable implements IChatWebSocketMa
return !!connection?.isOpen;
}

getStatefulMarker(conversationId: string): string | undefined {
const connection = this._connections.get(conversationId);
return connection?.isOpen ? connection.statefulMarker : undefined;
}

closeConnection(conversationId: string): void {
const connection = this._connections.get(conversationId);
if (connection) {
Expand Down
2 changes: 2 additions & 0 deletions extensions/copilot/src/platform/test/node/services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import { IScopeSelector } from '../../scopeSelection/common/scopeSelection';
import { ISearchService } from '../../search/common/searchService';
import { ISimulationTestContext, NulSimulationTestContext } from '../../simulationTestContext/common/simulationTestContext';
import { ISnippyService, NullSnippyService } from '../../snippy/common/snippyService';
import { IChatWebSocketManager, NullChatWebSocketManager } from '../../networking/node/chatWebSocketManager';
import { ISurveyService, NullSurveyService } from '../../survey/common/surveyService';
import { ITabsAndEditorsService } from '../../tabs/common/tabsAndEditorsService';
import { ITasksService } from '../../tasks/common/tasksService';
Expand Down Expand Up @@ -278,6 +279,7 @@ export function createPlatformServices(disposables: Pick<DisposableStore, 'add'>
}
}));
testingServiceCollection.define(ISnippyService, new SyncDescriptor(NullSnippyService));
testingServiceCollection.define(IChatWebSocketManager, new SyncDescriptor(NullChatWebSocketManager));
testingServiceCollection.define(IInteractiveSessionService, new SyncDescriptor(class implements IInteractiveSessionService {
_serviceBrand: undefined;
async transferActiveChat(workspaceUri: Uri): Promise<void> {
Expand Down
Loading