From d481e0604a79470e2c1308827b3ecb78bfb5327e Mon Sep 17 00:00:00 2001 From: Alan B Date: Wed, 30 Apr 2025 16:06:44 -0600 Subject: [PATCH 1/7] feat: :construction: catch user transcription --- pyproject.toml | 2 +- src/google/adk/agents/run_config.py | 3 + src/google/adk/flows/llm_flows/basic.py | 3 + .../adk/models/gemini_llm_connection.py | 348 ++++++------ src/google/adk/models/google_llm.py | 501 +++++++++--------- 5 files changed, 434 insertions(+), 423 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 436db8c1..895a4105 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,7 @@ dependencies = [ "google-cloud-secret-manager>=2.22.0", # Fetching secrets in RestAPI Tool "google-cloud-speech>=2.30.0", # For Audo Transcription "google-cloud-storage>=2.18.0, <3.0.0", # For GCS Artifact service - "google-genai>=1.11.0", # Google GenAI SDK + "google-genai>=1.12.1", # Google GenAI SDK "graphviz>=0.20.2", # Graphviz for graph rendering "mcp>=1.5.0;python_version>='3.10'", # For MCP Toolset "opentelemetry-api>=1.31.0", # OpenTelemetry diff --git a/src/google/adk/agents/run_config.py b/src/google/adk/agents/run_config.py index 17dff29c..bac68471 100644 --- a/src/google/adk/agents/run_config.py +++ b/src/google/adk/agents/run_config.py @@ -64,6 +64,9 @@ class RunConfig(BaseModel): output_audio_transcription: Optional[types.AudioTranscriptionConfig] = None """Output transcription for live agents with audio response.""" + input_audio_transcription: Optional[types.AudioTranscriptionConfig] = None + """Input transcription for live agents with audio input from user.""" + max_llm_calls: int = 500 """ A limit on the total number of llm calls for a given run. diff --git a/src/google/adk/flows/llm_flows/basic.py b/src/google/adk/flows/llm_flows/basic.py index 278b4cf3..d48c8cd2 100644 --- a/src/google/adk/flows/llm_flows/basic.py +++ b/src/google/adk/flows/llm_flows/basic.py @@ -62,6 +62,9 @@ async def run_async( llm_request.live_connect_config.output_audio_transcription = ( invocation_context.run_config.output_audio_transcription ) + llm_request.live_connect_config.input_audio_transcription = ( + invocation_context.run_config.input_audio_transcription + ) # TODO: handle tool append here, instead of in BaseTool.process_llm_request. diff --git a/src/google/adk/models/gemini_llm_connection.py b/src/google/adk/models/gemini_llm_connection.py index 30f1fb24..9c524729 100644 --- a/src/google/adk/models/gemini_llm_connection.py +++ b/src/google/adk/models/gemini_llm_connection.py @@ -25,176 +25,182 @@ class GeminiLlmConnection(BaseLlmConnection): - """The Gemini model connection.""" - - def __init__(self, gemini_session: live.AsyncSession): - self._gemini_session = gemini_session - - async def send_history(self, history: list[types.Content]): - """Sends the conversation history to the gemini model. - - You call this method right after setting up the model connection. - The model will respond if the last content is from user, otherwise it will - wait for new user input before responding. - - Args: - history: The conversation history to send to the model. - """ - - # TODO: Remove this filter and translate unary contents to streaming - # contents properly. - - # We ignore any audio from user during the agent transfer phase - contents = [ - content - for content in history - if content.parts and content.parts[0].text - ] - - if contents: - await self._gemini_session.send( - input=types.LiveClientContent( - turns=contents, - turn_complete=contents[-1].role == 'user', - ), - ) - else: - logger.info('no content is sent') - - async def send_content(self, content: types.Content): - """Sends a user content to the gemini model. - - The model will respond immediately upon receiving the content. - If you send function responses, all parts in the content should be function - responses. - - Args: - content: The content to send to the model. - """ - - assert content.parts - if content.parts[0].function_response: - # All parts have to be function responses. - function_responses = [part.function_response for part in content.parts] - logger.debug('Sending LLM function response: %s', function_responses) - await self._gemini_session.send( - input=types.LiveClientToolResponse( - function_responses=function_responses - ), - ) - else: - logger.debug('Sending LLM new content %s', content) - await self._gemini_session.send( - input=types.LiveClientContent( - turns=[content], - turn_complete=True, - ) - ) - - async def send_realtime(self, blob: types.Blob): - """Sends a chunk of audio or a frame of video to the model in realtime. - - Args: - blob: The blob to send to the model. - """ - - input_blob = blob.model_dump() - logger.debug('Sending LLM Blob: %s', input_blob) - await self._gemini_session.send(input=input_blob) - - def __build_full_text_response(self, text: str): - """Builds a full text response. - - The text should not partial and the returned LlmResponse is not be - partial. - - Args: - text: The text to be included in the response. - - Returns: - An LlmResponse containing the full text. - """ - return LlmResponse( - content=types.Content( - role='model', - parts=[types.Part.from_text(text=text)], - ), - ) - - async def receive(self) -> AsyncGenerator[LlmResponse, None]: - """Receives the model response using the llm server connection. - - Yields: - LlmResponse: The model response. - """ - - text = '' - async for message in self._gemini_session.receive(): - logger.debug('Got LLM Live message: %s', message) - if message.server_content: - content = message.server_content.model_turn - if content and content.parts: - llm_response = LlmResponse( - content=content, interrupted=message.server_content.interrupted - ) - if content.parts[0].text: - text += content.parts[0].text - llm_response.partial = True - # don't yield the merged text event when receiving audio data - elif text and not content.parts[0].inline_data: - yield self.__build_full_text_response(text) - text = '' - yield llm_response - - if ( - message.server_content.output_transcription - and message.server_content.output_transcription.text - ): - # TODO: Right now, we just support output_transcription without - # changing interface and data protocol. Later, we can consider to - # support output_transcription as a separate field in LlmResponse. - - # Transcription is always considered as partial event - # We rely on other control signals to determine when to yield the - # full text response(turn_complete, interrupted, or tool_call). - text += message.server_content.output_transcription.text - parts = [ - types.Part.from_text( - text=message.server_content.output_transcription.text - ) - ] - llm_response = LlmResponse( - content=types.Content(role='model', parts=parts), partial=True - ) - yield llm_response - - if message.server_content.turn_complete: - if text: - yield self.__build_full_text_response(text) - text = '' - yield LlmResponse( - turn_complete=True, interrupted=message.server_content.interrupted - ) - break - # in case of empty content or parts, we sill surface it - # in case it's an interrupted message, we merge the previous partial - # text. Other we don't merge. because content can be none when model - # safety threshold is triggered - if message.server_content.interrupted and text: - yield self.__build_full_text_response(text) - text = '' - yield LlmResponse(interrupted=message.server_content.interrupted) - if message.tool_call: - if text: - yield self.__build_full_text_response(text) - text = '' - parts = [ - types.Part(function_call=function_call) - for function_call in message.tool_call.function_calls - ] - yield LlmResponse(content=types.Content(role='model', parts=parts)) + """The Gemini model connection.""" + + def __init__(self, gemini_session: live.AsyncSession): + self._gemini_session = gemini_session + + async def send_history(self, history: list[types.Content]): + """Sends the conversation history to the gemini model. + + You call this method right after setting up the model connection. + The model will respond if the last content is from user, otherwise it will + wait for new user input before responding. - async def close(self): - """Closes the llm server connection.""" + Args: + history: The conversation history to send to the model. + """ + + # TODO: Remove this filter and translate unary contents to streaming + # contents properly. + + # We ignore any audio from user during the agent transfer phase + contents = [ + content for content in history if content.parts and content.parts[0].text + ] - await self._gemini_session.close() + if contents: + await self._gemini_session.send( + input=types.LiveClientContent( + turns=contents, + turn_complete=contents[-1].role == "user", + ), + ) + else: + logger.info("no content is sent") + + async def send_content(self, content: types.Content): + """Sends a user content to the gemini model. + + The model will respond immediately upon receiving the content. + If you send function responses, all parts in the content should be function + responses. + + Args: + content: The content to send to the model. + """ + + assert content.parts + if content.parts[0].function_response: + # All parts have to be function responses. + function_responses = [part.function_response for part in content.parts] + logger.debug("Sending LLM function response: %s", function_responses) + await self._gemini_session.send( + input=types.LiveClientToolResponse( + function_responses=function_responses + ), + ) + else: + logger.debug("Sending LLM new content %s", content) + await self._gemini_session.send( + input=types.LiveClientContent( + turns=[content], + turn_complete=True, + ) + ) + + async def send_realtime(self, blob: types.Blob): + """Sends a chunk of audio or a frame of video to the model in realtime. + + Args: + blob: The blob to send to the model. + """ + + input_blob = blob.model_dump() + logger.debug("Sending LLM Blob: %s", input_blob) + await self._gemini_session.send(input=input_blob) + + def __build_full_text_response(self, text: str): + """Builds a full text response. + + The text should not partial and the returned LlmResponse is not be + partial. + + Args: + text: The text to be included in the response. + + Returns: + An LlmResponse containing the full text. + """ + return LlmResponse( + content=types.Content( + role="model", + parts=[types.Part.from_text(text=text)], + ), + ) + + async def receive(self) -> AsyncGenerator[LlmResponse, None]: + """Receives the model response using the llm server connection. + + Yields: + LlmResponse: The model response. + """ + + text = "" + async for message in self._gemini_session.receive(): + logger.debug("Got LLM Live message: %s", message) + + if message.server_content: + content = message.server_content.model_turn + if content and content.parts: + llm_response = LlmResponse( + content=content, interrupted=message.server_content.interrupted + ) + if content.parts[0].text: + text += content.parts[0].text + llm_response.partial = True + # don't yield the merged text event when receiving audio data + elif text and not content.parts[0].inline_data: + yield self.__build_full_text_response(text) + text = "" + yield llm_response + if ( + message.server_content.input_transcription + and message.server_content.input_transcription.text + ): + print(message.server_content.input_transcription.text) + if ( + message.server_content.output_transcription + and message.server_content.output_transcription.text + ): + # TODO: Right now, we just support output_transcription without + # changing interface and data protocol. Later, we can consider to + # support output_transcription as a separate field in LlmResponse. + + # Transcription is always considered as partial event + # We rely on other control signals to determine when to yield the + # full text response(turn_complete, interrupted, or tool_call). + text += message.server_content.output_transcription.text + parts = [ + types.Part.from_text( + text=message.server_content.output_transcription.text + ) + ] + llm_response = LlmResponse( + content=types.Content(role="model", parts=parts), partial=True + ) + yield llm_response + + if message.server_content.turn_complete: + if text: + yield self.__build_full_text_response(text) + text = "" + yield LlmResponse( + turn_complete=True, + interrupted=message.server_content.interrupted, + ) + break + # in case of empty content or parts, we sill surface it + # in case it's an interrupted message, we merge the previous partial + # text. Other we don't merge. because content can be none when model + # safety threshold is triggered + if message.server_content.interrupted and text: + yield self.__build_full_text_response(text) + text = "" + yield LlmResponse(interrupted=message.server_content.interrupted) + if message.tool_call: + if text: + yield self.__build_full_text_response(text) + text = "" + parts = [ + types.Part(function_call=function_call) + for function_call in message.tool_call.function_calls + ] + yield LlmResponse(content=types.Content(role="model", parts=parts)) + else: + pass + + async def close(self): + """Closes the llm server connection.""" + + await self._gemini_session.close() diff --git a/src/google/adk/models/google_llm.py b/src/google/adk/models/google_llm.py index 29988dfc..7fd66b04 100644 --- a/src/google/adk/models/google_llm.py +++ b/src/google/adk/models/google_llm.py @@ -32,268 +32,267 @@ from .llm_response import LlmResponse if TYPE_CHECKING: - from .llm_request import LlmRequest + from .llm_request import LlmRequest logger = logging.getLogger(__name__) -_NEW_LINE = '\n' -_EXCLUDED_PART_FIELD = {'inline_data': {'data'}} +_NEW_LINE = "\n" +_EXCLUDED_PART_FIELD = {"inline_data": {"data"}} class Gemini(BaseLlm): - """Integration for Gemini models. + """Integration for Gemini models. - Attributes: - model: The name of the Gemini model. - """ - - model: str = 'gemini-1.5-flash' - - @staticmethod - @override - def supported_models() -> list[str]: - """Provides the list of supported models. - - Returns: - A list of supported models. - """ - - return [ - r'gemini-.*', - # fine-tuned vertex endpoint pattern - r'projects\/.+\/locations\/.+\/endpoints\/.+', - # vertex gemini long name - r'projects\/.+\/locations\/.+\/publishers\/google\/models\/gemini.+', - ] - - async def generate_content_async( - self, llm_request: LlmRequest, stream: bool = False - ) -> AsyncGenerator[LlmResponse, None]: - """Sends a request to the Gemini model. - - Args: - llm_request: LlmRequest, the request to send to the Gemini model. - stream: bool = False, whether to do streaming call. - - Yields: - LlmResponse: The model response. + Attributes: + model: The name of the Gemini model. """ - self._maybe_append_user_content(llm_request) - logger.info( - 'Sending out request, model: %s, backend: %s, stream: %s', - llm_request.model, - self._api_backend, - stream, - ) - logger.info(_build_request_log(llm_request)) - - if stream: - responses = await self.api_client.aio.models.generate_content_stream( - model=llm_request.model, - contents=llm_request.contents, - config=llm_request.config, - ) - response = None - text = '' - # for sse, similar as bidi (see receive method in gemini_llm_connecton.py), - # we need to mark those text content as partial and after all partial - # contents are sent, we send an accumulated event which contains all the - # previous partial content. The only difference is bidi rely on - # complete_turn flag to detect end while sse depends on finish_reason. - async for response in responses: - logger.info(_build_response_log(response)) - llm_response = LlmResponse.create(response) - if ( - llm_response.content - and llm_response.content.parts - and llm_response.content.parts[0].text - ): - text += llm_response.content.parts[0].text - llm_response.partial = True - elif text and ( - not llm_response.content - or not llm_response.content.parts - # don't yield the merged text event when receiving audio data - or not llm_response.content.parts[0].inline_data - ): - yield LlmResponse( - content=types.ModelContent( - parts=[types.Part.from_text(text=text)], - ), - ) - text = '' - yield llm_response - if ( - text - and response - and response.candidates - and response.candidates[0].finish_reason == types.FinishReason.STOP - ): - yield LlmResponse( - content=types.ModelContent( - parts=[types.Part.from_text(text=text)], - ), + model: str = "gemini-2.0-flash-live-preview-04-09" + + @staticmethod + @override + def supported_models() -> list[str]: + """Provides the list of supported models. + + Returns: + A list of supported models. + """ + + return [ + r"gemini-.*", + # fine-tuned vertex endpoint pattern + r"projects\/.+\/locations\/.+\/endpoints\/.+", + # vertex gemini long name + r"projects\/.+\/locations\/.+\/publishers\/google\/models\/gemini.+", + ] + + async def generate_content_async( + self, llm_request: LlmRequest, stream: bool = False + ) -> AsyncGenerator[LlmResponse, None]: + """Sends a request to the Gemini model. + + Args: + llm_request: LlmRequest, the request to send to the Gemini model. + stream: bool = False, whether to do streaming call. + + Yields: + LlmResponse: The model response. + """ + + self._maybe_append_user_content(llm_request) + logger.info( + "Sending out request, model: %s, backend: %s, stream: %s", + llm_request.model, + self._api_backend, + stream, ) - - else: - response = await self.api_client.aio.models.generate_content( - model=llm_request.model, - contents=llm_request.contents, - config=llm_request.config, - ) - logger.info(_build_response_log(response)) - yield LlmResponse.create(response) - - @cached_property - def api_client(self) -> Client: - """Provides the api client. - - Returns: - The api client. - """ - return Client( - http_options=types.HttpOptions(headers=self._tracking_headers) - ) - - @cached_property - def _api_backend(self) -> str: - return 'vertex' if self.api_client.vertexai else 'ml_dev' - - @cached_property - def _tracking_headers(self) -> dict[str, str]: - framework_label = f'google-adk/{version.__version__}' - language_label = 'gl-python/' + sys.version.split()[0] - version_header_value = f'{framework_label} {language_label}' - tracking_headers = { - 'x-goog-api-client': version_header_value, - 'user-agent': version_header_value, - } - return tracking_headers - - @cached_property - def _live_api_client(self) -> Client: - if self._api_backend == 'vertex': - # use default api version for vertex - return Client( - http_options=types.HttpOptions(headers=self._tracking_headers) - ) - else: - # use v1alpha for ml_dev - api_version = 'v1alpha' - return Client( - http_options=types.HttpOptions( - headers=self._tracking_headers, api_version=api_version - ) - ) - - @contextlib.asynccontextmanager - async def connect(self, llm_request: LlmRequest) -> BaseLlmConnection: - """Connects to the Gemini model and returns an llm connection. - - Args: - llm_request: LlmRequest, the request to send to the Gemini model. - - Yields: - BaseLlmConnection, the connection to the Gemini model. - """ - - llm_request.live_connect_config.system_instruction = types.Content( - role='system', - parts=[ - types.Part.from_text(text=llm_request.config.system_instruction) - ], - ) - llm_request.live_connect_config.tools = llm_request.config.tools - async with self._live_api_client.aio.live.connect( - model=llm_request.model, config=llm_request.live_connect_config - ) as live_session: - yield GeminiLlmConnection(live_session) - - def _maybe_append_user_content(self, llm_request: LlmRequest): - """Appends a user content, so that model can continue to output. - - Args: - llm_request: LlmRequest, the request to send to the Gemini model. - """ - # If no content is provided, append a user content to hint model response - # using system instruction. - if not llm_request.contents: - llm_request.contents.append( - types.Content( - role='user', - parts=[ - types.Part( - text=( - 'Handle the requests as specified in the System' - ' Instruction.' - ) - ) - ], - ) - ) - return - - # Insert a user content to preserve user intent and to avoid empty - # model response. - if llm_request.contents[-1].role != 'user': - llm_request.contents.append( - types.Content( - role='user', - parts=[ - types.Part( - text=( - 'Continue processing previous requests as instructed.' - ' Exit or provide a summary if no more outputs are' - ' needed.' - ) - ) - ], - ) - ) + logger.info(_build_request_log(llm_request)) + + if stream: + responses = await self.api_client.aio.models.generate_content_stream( + model=llm_request.model, + contents=llm_request.contents, + config=llm_request.config, + ) + response = None + text = "" + # for sse, similar as bidi (see receive method in gemini_llm_connecton.py), + # we need to mark those text content as partial and after all partial + # contents are sent, we send an accumulated event which contains all the + # previous partial content. The only difference is bidi rely on + # complete_turn flag to detect end while sse depends on finish_reason. + async for response in responses: + logger.info(_build_response_log(response)) + llm_response = LlmResponse.create(response) + if ( + llm_response.content + and llm_response.content.parts + and llm_response.content.parts[0].text + ): + text += llm_response.content.parts[0].text + llm_response.partial = True + elif text and ( + not llm_response.content + or not llm_response.content.parts + # don't yield the merged text event when receiving audio data + or not llm_response.content.parts[0].inline_data + ): + yield LlmResponse( + content=types.ModelContent( + parts=[types.Part.from_text(text=text)], + ), + ) + text = "" + yield llm_response + if ( + text + and response + and response.candidates + and response.candidates[0].finish_reason == types.FinishReason.STOP + ): + yield LlmResponse( + content=types.ModelContent( + parts=[types.Part.from_text(text=text)], + ), + ) + + else: + response = await self.api_client.aio.models.generate_content( + model=llm_request.model, + contents=llm_request.contents, + config=llm_request.config, + ) + logger.info(_build_response_log(response)) + yield LlmResponse.create(response) + + @cached_property + def api_client(self) -> Client: + """Provides the api client. + + Returns: + The api client. + """ + return Client(http_options=types.HttpOptions(headers=self._tracking_headers)) + + @cached_property + def _api_backend(self) -> str: + return "vertex" if self.api_client.vertexai else "ml_dev" + + @cached_property + def _tracking_headers(self) -> dict[str, str]: + framework_label = f"google-adk/{version.__version__}" + language_label = "gl-python/" + sys.version.split()[0] + version_header_value = f"{framework_label} {language_label}" + tracking_headers = { + "x-goog-api-client": version_header_value, + "user-agent": version_header_value, + } + return tracking_headers + + @cached_property + def _live_api_client(self) -> Client: + print(f"Using live api client for backend: {self._api_backend}") + if self._api_backend == "vertex": + # use default api version for vertex + return Client( + http_options=types.HttpOptions( + api_version="v1beta1", headers=self._tracking_headers + ) + ) + else: + # use v1alpha for ml_dev + api_version = "v1beta1" + return Client( + http_options=types.HttpOptions( + headers=self._tracking_headers, api_version=api_version + ) + ) + + @contextlib.asynccontextmanager + async def connect(self, llm_request: LlmRequest) -> BaseLlmConnection: + """Connects to the Gemini model and returns an llm connection. + + Args: + llm_request: LlmRequest, the request to send to the Gemini model. + + Yields: + BaseLlmConnection, the connection to the Gemini model. + """ + + llm_request.live_connect_config.system_instruction = types.Content( + role="system", + parts=[types.Part.from_text(text=llm_request.config.system_instruction)], + ) + llm_request.live_connect_config.tools = llm_request.config.tools + print(f"live_config:{llm_request.live_connect_config}") + print(llm_request.model) + async with self._live_api_client.aio.live.connect( + model="gemini-2.0-flash-live-preview-04-09", + config=llm_request.live_connect_config, + ) as live_session: + yield GeminiLlmConnection(live_session) + + def _maybe_append_user_content(self, llm_request: LlmRequest): + """Appends a user content, so that model can continue to output. + + Args: + llm_request: LlmRequest, the request to send to the Gemini model. + """ + # If no content is provided, append a user content to hint model response + # using system instruction. + if not llm_request.contents: + llm_request.contents.append( + types.Content( + role="user", + parts=[ + types.Part( + text=( + "Handle the requests as specified in the System" + " Instruction." + ) + ) + ], + ) + ) + return + + # Insert a user content to preserve user intent and to avoid empty + # model response. + if llm_request.contents[-1].role != "user": + llm_request.contents.append( + types.Content( + role="user", + parts=[ + types.Part( + text=( + "Continue processing previous requests as instructed." + " Exit or provide a summary if no more outputs are" + " needed." + ) + ) + ], + ) + ) def _build_function_declaration_log( func_decl: types.FunctionDeclaration, ) -> str: - param_str = '{}' - if func_decl.parameters and func_decl.parameters.properties: - param_str = str({ - k: v.model_dump(exclude_none=True) - for k, v in func_decl.parameters.properties.items() - }) - return_str = 'None' - if func_decl.response: - return_str = str(func_decl.response.model_dump(exclude_none=True)) - return f'{func_decl.name}: {param_str} -> {return_str}' + param_str = "{}" + if func_decl.parameters and func_decl.parameters.properties: + param_str = str( + { + k: v.model_dump(exclude_none=True) + for k, v in func_decl.parameters.properties.items() + } + ) + return_str = "None" + if func_decl.response: + return_str = str(func_decl.response.model_dump(exclude_none=True)) + return f"{func_decl.name}: {param_str} -> {return_str}" def _build_request_log(req: LlmRequest) -> str: - function_decls: list[types.FunctionDeclaration] = cast( - list[types.FunctionDeclaration], - req.config.tools[0].function_declarations if req.config.tools else [], - ) - function_logs = ( - [ - _build_function_declaration_log(func_decl) - for func_decl in function_decls - ] - if function_decls - else [] - ) - contents_logs = [ - content.model_dump_json( - exclude_none=True, - exclude={ - 'parts': { - i: _EXCLUDED_PART_FIELD for i in range(len(content.parts)) - } - }, - ) - for content in req.contents - ] - - return f""" + function_decls: list[types.FunctionDeclaration] = cast( + list[types.FunctionDeclaration], + req.config.tools[0].function_declarations if req.config.tools else [], + ) + function_logs = ( + [_build_function_declaration_log(func_decl) for func_decl in function_decls] + if function_decls + else [] + ) + contents_logs = [ + content.model_dump_json( + exclude_none=True, + exclude={ + "parts": {i: _EXCLUDED_PART_FIELD for i in range(len(content.parts))} + }, + ) + for content in req.contents + ] + + return f""" LLM Request: ----------------------------------------------------------- System Instruction: @@ -309,13 +308,13 @@ def _build_request_log(req: LlmRequest) -> str: def _build_response_log(resp: types.GenerateContentResponse) -> str: - function_calls_text = [] - if function_calls := resp.function_calls: - for func_call in function_calls: - function_calls_text.append( - f'name: {func_call.name}, args: {func_call.args}' - ) - return f""" + function_calls_text = [] + if function_calls := resp.function_calls: + for func_call in function_calls: + function_calls_text.append( + f"name: {func_call.name}, args: {func_call.args}" + ) + return f""" LLM Response: ----------------------------------------------------------- Text: From bba436bb76d1d2f9d5ba969fce38ff8b8a443254 Mon Sep 17 00:00:00 2001 From: Alan B Date: Wed, 30 Apr 2025 16:38:02 -0600 Subject: [PATCH 2/7] feat: :sparkles: send user transcription event as llm_response --- src/google/adk/models/gemini_llm_connection.py | 11 ++++++++++- src/google/adk/models/google_llm.py | 4 +--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/google/adk/models/gemini_llm_connection.py b/src/google/adk/models/gemini_llm_connection.py index 9c524729..d7630307 100644 --- a/src/google/adk/models/gemini_llm_connection.py +++ b/src/google/adk/models/gemini_llm_connection.py @@ -148,7 +148,16 @@ async def receive(self) -> AsyncGenerator[LlmResponse, None]: message.server_content.input_transcription and message.server_content.input_transcription.text ): - print(message.server_content.input_transcription.text) + user_text = message.server_content.input_transcription.text + parts = [ + types.Part.from_text( + text=user_text, + ) + ] + llm_response = LlmResponse( + content=types.Content(role="user", parts=parts) + ) + yield llm_response if ( message.server_content.output_transcription and message.server_content.output_transcription.text diff --git a/src/google/adk/models/google_llm.py b/src/google/adk/models/google_llm.py index 7fd66b04..323ccbbf 100644 --- a/src/google/adk/models/google_llm.py +++ b/src/google/adk/models/google_llm.py @@ -204,10 +204,8 @@ async def connect(self, llm_request: LlmRequest) -> BaseLlmConnection: parts=[types.Part.from_text(text=llm_request.config.system_instruction)], ) llm_request.live_connect_config.tools = llm_request.config.tools - print(f"live_config:{llm_request.live_connect_config}") - print(llm_request.model) async with self._live_api_client.aio.live.connect( - model="gemini-2.0-flash-live-preview-04-09", + model=llm_request.model, config=llm_request.live_connect_config, ) as live_session: yield GeminiLlmConnection(live_session) From ad2abf540c60895b79c50f9051a6289ce394b98d Mon Sep 17 00:00:00 2001 From: Alan B Date: Thu, 1 May 2025 22:43:22 -0600 Subject: [PATCH 3/7] style: :lipstick: update lint problems --- .../adk/models/gemini_llm_connection.py | 368 +++++++------ src/google/adk/models/google_llm.py | 500 +++++++++--------- 2 files changed, 435 insertions(+), 433 deletions(-) diff --git a/src/google/adk/models/gemini_llm_connection.py b/src/google/adk/models/gemini_llm_connection.py index d7630307..40189758 100644 --- a/src/google/adk/models/gemini_llm_connection.py +++ b/src/google/adk/models/gemini_llm_connection.py @@ -25,191 +25,189 @@ class GeminiLlmConnection(BaseLlmConnection): - """The Gemini model connection.""" - - def __init__(self, gemini_session: live.AsyncSession): - self._gemini_session = gemini_session - - async def send_history(self, history: list[types.Content]): - """Sends the conversation history to the gemini model. - - You call this method right after setting up the model connection. - The model will respond if the last content is from user, otherwise it will - wait for new user input before responding. - - Args: - history: The conversation history to send to the model. - """ - - # TODO: Remove this filter and translate unary contents to streaming - # contents properly. - - # We ignore any audio from user during the agent transfer phase - contents = [ - content for content in history if content.parts and content.parts[0].text - ] - - if contents: - await self._gemini_session.send( - input=types.LiveClientContent( - turns=contents, - turn_complete=contents[-1].role == "user", - ), - ) - else: - logger.info("no content is sent") - - async def send_content(self, content: types.Content): - """Sends a user content to the gemini model. - - The model will respond immediately upon receiving the content. - If you send function responses, all parts in the content should be function - responses. - - Args: - content: The content to send to the model. - """ - - assert content.parts - if content.parts[0].function_response: - # All parts have to be function responses. - function_responses = [part.function_response for part in content.parts] - logger.debug("Sending LLM function response: %s", function_responses) - await self._gemini_session.send( - input=types.LiveClientToolResponse( - function_responses=function_responses - ), - ) - else: - logger.debug("Sending LLM new content %s", content) - await self._gemini_session.send( - input=types.LiveClientContent( - turns=[content], - turn_complete=True, + """The Gemini model connection.""" + + def __init__(self, gemini_session: live.AsyncSession): + self._gemini_session = gemini_session + + async def send_history(self, history: list[types.Content]): + """Sends the conversation history to the gemini model. + + You call this method right after setting up the model connection. + The model will respond if the last content is from user, otherwise it will + wait for new user input before responding. + + Args: + history: The conversation history to send to the model. + """ + + # TODO: Remove this filter and translate unary contents to streaming + # contents properly. + + # We ignore any audio from user during the agent transfer phase + contents = [ + content + for content in history + if content.parts and content.parts[0].text + ] + + if contents: + await self._gemini_session.send( + input=types.LiveClientContent( + turns=contents, + turn_complete=contents[-1].role == 'user', + ), + ) + else: + logger.info('no content is sent') + + async def send_content(self, content: types.Content): + """Sends a user content to the gemini model. + + The model will respond immediately upon receiving the content. + If you send function responses, all parts in the content should be function + responses. + + Args: + content: The content to send to the model. + """ + + assert content.parts + if content.parts[0].function_response: + # All parts have to be function responses. + function_responses = [part.function_response for part in content.parts] + logger.debug('Sending LLM function response: %s', function_responses) + await self._gemini_session.send( + input=types.LiveClientToolResponse( + function_responses=function_responses + ), + ) + else: + logger.debug('Sending LLM new content %s', content) + await self._gemini_session.send( + input=types.LiveClientContent( + turns=[content], + turn_complete=True, + ) + ) + + async def send_realtime(self, blob: types.Blob): + """Sends a chunk of audio or a frame of video to the model in realtime. + + Args: + blob: The blob to send to the model. + """ + + input_blob = blob.model_dump() + logger.debug('Sending LLM Blob: %s', input_blob) + await self._gemini_session.send(input=input_blob) + + def __build_full_text_response(self, text: str): + """Builds a full text response. + + The text should not partial and the returned LlmResponse is not be + partial. + + Args: + text: The text to be included in the response. + + Returns: + An LlmResponse containing the full text. + """ + return LlmResponse( + content=types.Content( + role='model', + parts=[types.Part.from_text(text=text)], + ), + ) + + async def receive(self) -> AsyncGenerator[LlmResponse, None]: + """Receives the model response using the llm server connection. + + Yields: + LlmResponse: The model response. + """ + + text = '' + async for message in self._gemini_session.receive(): + logger.debug('Got LLM Live message: %s', message) + if message.server_content: + content = message.server_content.model_turn + if content and content.parts: + llm_response = LlmResponse( + content=content, interrupted=message.server_content.interrupted + ) + if content.parts[0].text: + text += content.parts[0].text + llm_response.partial = True + # don't yield the merged text event when receiving audio data + elif text and not content.parts[0].inline_data: + yield self.__build_full_text_response(text) + text = '' + yield llm_response + if ( + message.server_content.input_transcription + and message.server_content.input_transcription.text + ): + user_text = message.server_content.input_transcription.text + parts = [ + types.Part.from_text( + text=user_text, ) + ] + llm_response = LlmResponse( + content=types.Content(role='user', parts=parts) ) + yield llm_response + if ( + message.server_content.output_transcription + and message.server_content.output_transcription.text + ): + # TODO: Right now, we just support output_transcription without + # changing interface and data protocol. Later, we can consider to + # support output_transcription as a separate field in LlmResponse. + + # Transcription is always considered as partial event + # We rely on other control signals to determine when to yield the + # full text response(turn_complete, interrupted, or tool_call). + text += message.server_content.output_transcription.text + parts = [ + types.Part.from_text( + text=message.server_content.output_transcription.text + ) + ] + llm_response = LlmResponse( + content=types.Content(role='model', parts=parts), partial=True + ) + yield llm_response + + if message.server_content.turn_complete: + if text: + yield self.__build_full_text_response(text) + text = '' + yield LlmResponse( + turn_complete=True, interrupted=message.server_content.interrupted + ) + break + # in case of empty content or parts, we sill surface it + # in case it's an interrupted message, we merge the previous partial + # text. Other we don't merge. because content can be none when model + # safety threshold is triggered + if message.server_content.interrupted and text: + yield self.__build_full_text_response(text) + text = '' + yield LlmResponse(interrupted=message.server_content.interrupted) + if message.tool_call: + if text: + yield self.__build_full_text_response(text) + text = '' + parts = [ + types.Part(function_call=function_call) + for function_call in message.tool_call.function_calls + ] + yield LlmResponse(content=types.Content(role='model', parts=parts)) + + async def close(self): + """Closes the llm server connection.""" - async def send_realtime(self, blob: types.Blob): - """Sends a chunk of audio or a frame of video to the model in realtime. - - Args: - blob: The blob to send to the model. - """ - - input_blob = blob.model_dump() - logger.debug("Sending LLM Blob: %s", input_blob) - await self._gemini_session.send(input=input_blob) - - def __build_full_text_response(self, text: str): - """Builds a full text response. - - The text should not partial and the returned LlmResponse is not be - partial. - - Args: - text: The text to be included in the response. - - Returns: - An LlmResponse containing the full text. - """ - return LlmResponse( - content=types.Content( - role="model", - parts=[types.Part.from_text(text=text)], - ), - ) - - async def receive(self) -> AsyncGenerator[LlmResponse, None]: - """Receives the model response using the llm server connection. - - Yields: - LlmResponse: The model response. - """ - - text = "" - async for message in self._gemini_session.receive(): - logger.debug("Got LLM Live message: %s", message) - - if message.server_content: - content = message.server_content.model_turn - if content and content.parts: - llm_response = LlmResponse( - content=content, interrupted=message.server_content.interrupted - ) - if content.parts[0].text: - text += content.parts[0].text - llm_response.partial = True - # don't yield the merged text event when receiving audio data - elif text and not content.parts[0].inline_data: - yield self.__build_full_text_response(text) - text = "" - yield llm_response - if ( - message.server_content.input_transcription - and message.server_content.input_transcription.text - ): - user_text = message.server_content.input_transcription.text - parts = [ - types.Part.from_text( - text=user_text, - ) - ] - llm_response = LlmResponse( - content=types.Content(role="user", parts=parts) - ) - yield llm_response - if ( - message.server_content.output_transcription - and message.server_content.output_transcription.text - ): - # TODO: Right now, we just support output_transcription without - # changing interface and data protocol. Later, we can consider to - # support output_transcription as a separate field in LlmResponse. - - # Transcription is always considered as partial event - # We rely on other control signals to determine when to yield the - # full text response(turn_complete, interrupted, or tool_call). - text += message.server_content.output_transcription.text - parts = [ - types.Part.from_text( - text=message.server_content.output_transcription.text - ) - ] - llm_response = LlmResponse( - content=types.Content(role="model", parts=parts), partial=True - ) - yield llm_response - - if message.server_content.turn_complete: - if text: - yield self.__build_full_text_response(text) - text = "" - yield LlmResponse( - turn_complete=True, - interrupted=message.server_content.interrupted, - ) - break - # in case of empty content or parts, we sill surface it - # in case it's an interrupted message, we merge the previous partial - # text. Other we don't merge. because content can be none when model - # safety threshold is triggered - if message.server_content.interrupted and text: - yield self.__build_full_text_response(text) - text = "" - yield LlmResponse(interrupted=message.server_content.interrupted) - if message.tool_call: - if text: - yield self.__build_full_text_response(text) - text = "" - parts = [ - types.Part(function_call=function_call) - for function_call in message.tool_call.function_calls - ] - yield LlmResponse(content=types.Content(role="model", parts=parts)) - else: - pass - - async def close(self): - """Closes the llm server connection.""" - - await self._gemini_session.close() + await self._gemini_session.close() diff --git a/src/google/adk/models/google_llm.py b/src/google/adk/models/google_llm.py index 323ccbbf..119d8202 100644 --- a/src/google/adk/models/google_llm.py +++ b/src/google/adk/models/google_llm.py @@ -32,265 +32,269 @@ from .llm_response import LlmResponse if TYPE_CHECKING: - from .llm_request import LlmRequest + from .llm_request import LlmRequest logger = logging.getLogger(__name__) -_NEW_LINE = "\n" -_EXCLUDED_PART_FIELD = {"inline_data": {"data"}} +_NEW_LINE = '\n' +_EXCLUDED_PART_FIELD = {'inline_data': {'data'}} class Gemini(BaseLlm): - """Integration for Gemini models. + """Integration for Gemini models. - Attributes: - model: The name of the Gemini model. + Attributes: + model: The name of the Gemini model. + """ + + model: str = 'gemini-1.5-flash' + + @staticmethod + @override + def supported_models() -> list[str]: + """Provides the list of supported models. + + Returns: + A list of supported models. """ - model: str = "gemini-2.0-flash-live-preview-04-09" - - @staticmethod - @override - def supported_models() -> list[str]: - """Provides the list of supported models. - - Returns: - A list of supported models. - """ - - return [ - r"gemini-.*", - # fine-tuned vertex endpoint pattern - r"projects\/.+\/locations\/.+\/endpoints\/.+", - # vertex gemini long name - r"projects\/.+\/locations\/.+\/publishers\/google\/models\/gemini.+", - ] - - async def generate_content_async( - self, llm_request: LlmRequest, stream: bool = False - ) -> AsyncGenerator[LlmResponse, None]: - """Sends a request to the Gemini model. - - Args: - llm_request: LlmRequest, the request to send to the Gemini model. - stream: bool = False, whether to do streaming call. - - Yields: - LlmResponse: The model response. - """ - - self._maybe_append_user_content(llm_request) - logger.info( - "Sending out request, model: %s, backend: %s, stream: %s", - llm_request.model, - self._api_backend, - stream, - ) - logger.info(_build_request_log(llm_request)) - - if stream: - responses = await self.api_client.aio.models.generate_content_stream( - model=llm_request.model, - contents=llm_request.contents, - config=llm_request.config, - ) - response = None - text = "" - # for sse, similar as bidi (see receive method in gemini_llm_connecton.py), - # we need to mark those text content as partial and after all partial - # contents are sent, we send an accumulated event which contains all the - # previous partial content. The only difference is bidi rely on - # complete_turn flag to detect end while sse depends on finish_reason. - async for response in responses: - logger.info(_build_response_log(response)) - llm_response = LlmResponse.create(response) - if ( - llm_response.content - and llm_response.content.parts - and llm_response.content.parts[0].text - ): - text += llm_response.content.parts[0].text - llm_response.partial = True - elif text and ( - not llm_response.content - or not llm_response.content.parts - # don't yield the merged text event when receiving audio data - or not llm_response.content.parts[0].inline_data - ): - yield LlmResponse( - content=types.ModelContent( - parts=[types.Part.from_text(text=text)], - ), - ) - text = "" - yield llm_response - if ( - text - and response - and response.candidates - and response.candidates[0].finish_reason == types.FinishReason.STOP - ): - yield LlmResponse( - content=types.ModelContent( - parts=[types.Part.from_text(text=text)], - ), - ) - - else: - response = await self.api_client.aio.models.generate_content( - model=llm_request.model, - contents=llm_request.contents, - config=llm_request.config, - ) - logger.info(_build_response_log(response)) - yield LlmResponse.create(response) - - @cached_property - def api_client(self) -> Client: - """Provides the api client. - - Returns: - The api client. - """ - return Client(http_options=types.HttpOptions(headers=self._tracking_headers)) - - @cached_property - def _api_backend(self) -> str: - return "vertex" if self.api_client.vertexai else "ml_dev" - - @cached_property - def _tracking_headers(self) -> dict[str, str]: - framework_label = f"google-adk/{version.__version__}" - language_label = "gl-python/" + sys.version.split()[0] - version_header_value = f"{framework_label} {language_label}" - tracking_headers = { - "x-goog-api-client": version_header_value, - "user-agent": version_header_value, - } - return tracking_headers - - @cached_property - def _live_api_client(self) -> Client: - print(f"Using live api client for backend: {self._api_backend}") - if self._api_backend == "vertex": - # use default api version for vertex - return Client( - http_options=types.HttpOptions( - api_version="v1beta1", headers=self._tracking_headers - ) - ) - else: - # use v1alpha for ml_dev - api_version = "v1beta1" - return Client( - http_options=types.HttpOptions( - headers=self._tracking_headers, api_version=api_version - ) - ) - - @contextlib.asynccontextmanager - async def connect(self, llm_request: LlmRequest) -> BaseLlmConnection: - """Connects to the Gemini model and returns an llm connection. - - Args: - llm_request: LlmRequest, the request to send to the Gemini model. - - Yields: - BaseLlmConnection, the connection to the Gemini model. - """ - - llm_request.live_connect_config.system_instruction = types.Content( - role="system", - parts=[types.Part.from_text(text=llm_request.config.system_instruction)], + return [ + r'gemini-.*', + # fine-tuned vertex endpoint pattern + r'projects\/.+\/locations\/.+\/endpoints\/.+', + # vertex gemini long name + r'projects\/.+\/locations\/.+\/publishers\/google\/models\/gemini.+', + ] + + async def generate_content_async( + self, llm_request: LlmRequest, stream: bool = False + ) -> AsyncGenerator[LlmResponse, None]: + """Sends a request to the Gemini model. + + Args: + llm_request: LlmRequest, the request to send to the Gemini model. + stream: bool = False, whether to do streaming call. + + Yields: + LlmResponse: The model response. + """ + + self._maybe_append_user_content(llm_request) + logger.info( + 'Sending out request, model: %s, backend: %s, stream: %s', + llm_request.model, + self._api_backend, + stream, + ) + logger.info(_build_request_log(llm_request)) + + if stream: + responses = await self.api_client.aio.models.generate_content_stream( + model=llm_request.model, + contents=llm_request.contents, + config=llm_request.config, + ) + response = None + text = '' + # for sse, similar as bidi (see receive method in gemini_llm_connecton.py), + # we need to mark those text content as partial and after all partial + # contents are sent, we send an accumulated event which contains all the + # previous partial content. The only difference is bidi rely on + # complete_turn flag to detect end while sse depends on finish_reason. + async for response in responses: + logger.info(_build_response_log(response)) + llm_response = LlmResponse.create(response) + if ( + llm_response.content + and llm_response.content.parts + and llm_response.content.parts[0].text + ): + text += llm_response.content.parts[0].text + llm_response.partial = True + elif text and ( + not llm_response.content + or not llm_response.content.parts + # don't yield the merged text event when receiving audio data + or not llm_response.content.parts[0].inline_data + ): + yield LlmResponse( + content=types.ModelContent( + parts=[types.Part.from_text(text=text)], + ), + ) + text = '' + yield llm_response + if ( + text + and response + and response.candidates + and response.candidates[0].finish_reason == types.FinishReason.STOP + ): + yield LlmResponse( + content=types.ModelContent( + parts=[types.Part.from_text(text=text)], + ), ) - llm_request.live_connect_config.tools = llm_request.config.tools - async with self._live_api_client.aio.live.connect( - model=llm_request.model, - config=llm_request.live_connect_config, - ) as live_session: - yield GeminiLlmConnection(live_session) - - def _maybe_append_user_content(self, llm_request: LlmRequest): - """Appends a user content, so that model can continue to output. - - Args: - llm_request: LlmRequest, the request to send to the Gemini model. - """ - # If no content is provided, append a user content to hint model response - # using system instruction. - if not llm_request.contents: - llm_request.contents.append( - types.Content( - role="user", - parts=[ - types.Part( - text=( - "Handle the requests as specified in the System" - " Instruction." - ) - ) - ], - ) - ) - return - - # Insert a user content to preserve user intent and to avoid empty - # model response. - if llm_request.contents[-1].role != "user": - llm_request.contents.append( - types.Content( - role="user", - parts=[ - types.Part( - text=( - "Continue processing previous requests as instructed." - " Exit or provide a summary if no more outputs are" - " needed." - ) - ) - ], - ) - ) + + else: + response = await self.api_client.aio.models.generate_content( + model=llm_request.model, + contents=llm_request.contents, + config=llm_request.config, + ) + logger.info(_build_response_log(response)) + yield LlmResponse.create(response) + + @cached_property + def api_client(self) -> Client: + """Provides the api client. + + Returns: + The api client. + """ + return Client( + http_options=types.HttpOptions(headers=self._tracking_headers) + ) + + @cached_property + def _api_backend(self) -> str: + return 'vertex' if self.api_client.vertexai else 'ml_dev' + + @cached_property + def _tracking_headers(self) -> dict[str, str]: + framework_label = f'google-adk/{version.__version__}' + language_label = 'gl-python/' + sys.version.split()[0] + version_header_value = f'{framework_label} {language_label}' + tracking_headers = { + 'x-goog-api-client': version_header_value, + 'user-agent': version_header_value, + } + return tracking_headers + + @cached_property + def _live_api_client(self) -> Client: + if self._api_backend == 'vertex': + # use default api version for vertex + api_version = "v1beta1" + return Client( + http_options=types.HttpOptions(headers=self._tracking_headers,version=api_version) + ) + else: + # use v1alpha for ml_dev + api_version = 'v1alpha' + return Client( + http_options=types.HttpOptions( + headers=self._tracking_headers, api_version=api_version + ) + ) + + @contextlib.asynccontextmanager + async def connect(self, llm_request: LlmRequest) -> BaseLlmConnection: + """Connects to the Gemini model and returns an llm connection. + + Args: + llm_request: LlmRequest, the request to send to the Gemini model. + + Yields: + BaseLlmConnection, the connection to the Gemini model. + """ + + llm_request.live_connect_config.system_instruction = types.Content( + role='system', + parts=[ + types.Part.from_text(text=llm_request.config.system_instruction) + ], + ) + llm_request.live_connect_config.tools = llm_request.config.tools + async with self._live_api_client.aio.live.connect( + model=llm_request.model, config=llm_request.live_connect_config + ) as live_session: + yield GeminiLlmConnection(live_session) + + def _maybe_append_user_content(self, llm_request: LlmRequest): + """Appends a user content, so that model can continue to output. + + Args: + llm_request: LlmRequest, the request to send to the Gemini model. + """ + # If no content is provided, append a user content to hint model response + # using system instruction. + if not llm_request.contents: + llm_request.contents.append( + types.Content( + role='user', + parts=[ + types.Part( + text=( + 'Handle the requests as specified in the System' + ' Instruction.' + ) + ) + ], + ) + ) + return + + # Insert a user content to preserve user intent and to avoid empty + # model response. + if llm_request.contents[-1].role != 'user': + llm_request.contents.append( + types.Content( + role='user', + parts=[ + types.Part( + text=( + 'Continue processing previous requests as instructed.' + ' Exit or provide a summary if no more outputs are' + ' needed.' + ) + ) + ], + ) + ) def _build_function_declaration_log( func_decl: types.FunctionDeclaration, ) -> str: - param_str = "{}" - if func_decl.parameters and func_decl.parameters.properties: - param_str = str( - { - k: v.model_dump(exclude_none=True) - for k, v in func_decl.parameters.properties.items() - } - ) - return_str = "None" - if func_decl.response: - return_str = str(func_decl.response.model_dump(exclude_none=True)) - return f"{func_decl.name}: {param_str} -> {return_str}" + param_str = '{}' + if func_decl.parameters and func_decl.parameters.properties: + param_str = str({ + k: v.model_dump(exclude_none=True) + for k, v in func_decl.parameters.properties.items() + }) + return_str = 'None' + if func_decl.response: + return_str = str(func_decl.response.model_dump(exclude_none=True)) + return f'{func_decl.name}: {param_str} -> {return_str}' def _build_request_log(req: LlmRequest) -> str: - function_decls: list[types.FunctionDeclaration] = cast( - list[types.FunctionDeclaration], - req.config.tools[0].function_declarations if req.config.tools else [], - ) - function_logs = ( - [_build_function_declaration_log(func_decl) for func_decl in function_decls] - if function_decls - else [] - ) - contents_logs = [ - content.model_dump_json( - exclude_none=True, - exclude={ - "parts": {i: _EXCLUDED_PART_FIELD for i in range(len(content.parts))} - }, - ) - for content in req.contents - ] - - return f""" + function_decls: list[types.FunctionDeclaration] = cast( + list[types.FunctionDeclaration], + req.config.tools[0].function_declarations if req.config.tools else [], + ) + function_logs = ( + [ + _build_function_declaration_log(func_decl) + for func_decl in function_decls + ] + if function_decls + else [] + ) + contents_logs = [ + content.model_dump_json( + exclude_none=True, + exclude={ + 'parts': { + i: _EXCLUDED_PART_FIELD for i in range(len(content.parts)) + } + }, + ) + for content in req.contents + ] + + return f""" LLM Request: ----------------------------------------------------------- System Instruction: @@ -306,13 +310,13 @@ def _build_request_log(req: LlmRequest) -> str: def _build_response_log(resp: types.GenerateContentResponse) -> str: - function_calls_text = [] - if function_calls := resp.function_calls: - for func_call in function_calls: - function_calls_text.append( - f"name: {func_call.name}, args: {func_call.args}" - ) - return f""" + function_calls_text = [] + if function_calls := resp.function_calls: + for func_call in function_calls: + function_calls_text.append( + f'name: {func_call.name}, args: {func_call.args}' + ) + return f""" LLM Response: ----------------------------------------------------------- Text: From 744703c06716300c0f9f41633d3bafdf4cb180a1 Mon Sep 17 00:00:00 2001 From: Hangfei Lin Date: Sat, 3 May 2025 16:32:01 -0700 Subject: [PATCH 4/7] fix: set right order for input transcription --- src/google/adk/flows/llm_flows/base_llm_flow.py | 13 ++++++++++++- src/google/adk/models/google_llm.py | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/google/adk/flows/llm_flows/base_llm_flow.py b/src/google/adk/flows/llm_flows/base_llm_flow.py index 188f3a5d..df5a5f1b 100644 --- a/src/google/adk/flows/llm_flows/base_llm_flow.py +++ b/src/google/adk/flows/llm_flows/base_llm_flow.py @@ -190,6 +190,16 @@ async def _receive_from_model( llm_request: LlmRequest, ) -> AsyncGenerator[Event, None]: """Receive data from model and process events using BaseLlmConnection.""" + def get_author(llm_response): + """Get the author of the event. + + When the model returns transcription, the author is "user". Otherwise, the author is the agent. + """ + if llm_response and llm_response.content and llm_response.content.role == "user": + return "user" + else: + return invocation_context.agent.name + assert invocation_context.live_request_queue try: while True: @@ -197,7 +207,7 @@ async def _receive_from_model( model_response_event = Event( id=Event.new_id(), invocation_id=invocation_context.invocation_id, - author=invocation_context.agent.name, + author=get_author(llm_response), ) async for event in self._postprocess_live( invocation_context, @@ -354,6 +364,7 @@ async def _postprocess_live( async for event in self._postprocess_run_processors_async( invocation_context, llm_response ): + print("event...1313132:", event.author) yield event # Skip the model response event if there is no content and no error code. diff --git a/src/google/adk/models/google_llm.py b/src/google/adk/models/google_llm.py index 8053b563..ba3004d1 100644 --- a/src/google/adk/models/google_llm.py +++ b/src/google/adk/models/google_llm.py @@ -177,7 +177,7 @@ def _live_api_client(self) -> Client: # use default api version for vertex api_version = "v1beta1" return Client( - http_options=types.HttpOptions(headers=self._tracking_headers,version=api_version) + http_options=types.HttpOptions(headers=self._tracking_headers) ) else: # use v1alpha for ml_dev From 31a5d42d6155b0e5caad0c73c8df43255322016f Mon Sep 17 00:00:00 2001 From: Hangfei Lin Date: Sat, 3 May 2025 16:32:50 -0700 Subject: [PATCH 5/7] remove print --- src/google/adk/flows/llm_flows/base_llm_flow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/google/adk/flows/llm_flows/base_llm_flow.py b/src/google/adk/flows/llm_flows/base_llm_flow.py index df5a5f1b..31904e3c 100644 --- a/src/google/adk/flows/llm_flows/base_llm_flow.py +++ b/src/google/adk/flows/llm_flows/base_llm_flow.py @@ -364,7 +364,6 @@ async def _postprocess_live( async for event in self._postprocess_run_processors_async( invocation_context, llm_response ): - print("event...1313132:", event.author) yield event # Skip the model response event if there is no content and no error code. From 59e5d9c72060f97d124883150989315401a4c1b5 Mon Sep 17 00:00:00 2001 From: Hangfei Lin Date: Sat, 3 May 2025 16:35:15 -0700 Subject: [PATCH 6/7] remove api version --- src/google/adk/models/google_llm.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/google/adk/models/google_llm.py b/src/google/adk/models/google_llm.py index ba3004d1..342d8bc5 100644 --- a/src/google/adk/models/google_llm.py +++ b/src/google/adk/models/google_llm.py @@ -175,7 +175,6 @@ def _tracking_headers(self) -> dict[str, str]: def _live_api_client(self) -> Client: if self._api_backend == 'vertex': # use default api version for vertex - api_version = "v1beta1" return Client( http_options=types.HttpOptions(headers=self._tracking_headers) ) From 4b782f8489969e50c5514e1bcbd5351d736dadc8 Mon Sep 17 00:00:00 2001 From: Alan B Date: Sun, 4 May 2025 11:29:24 -0600 Subject: [PATCH 7/7] feat: :sparkles: set version to gemini using vertex api --- src/google/adk/models/google_llm.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/google/adk/models/google_llm.py b/src/google/adk/models/google_llm.py index 342d8bc5..e4f21e5f 100644 --- a/src/google/adk/models/google_llm.py +++ b/src/google/adk/models/google_llm.py @@ -174,9 +174,11 @@ def _tracking_headers(self) -> dict[str, str]: @cached_property def _live_api_client(self) -> Client: if self._api_backend == 'vertex': + #use beta version for vertex api + api_version = 'v1beta1' # use default api version for vertex return Client( - http_options=types.HttpOptions(headers=self._tracking_headers) + http_options=types.HttpOptions(headers=self._tracking_headers,api_version=api_version) ) else: # use v1alpha for ml_dev