You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When running the DAG on older versions of HttpAsyncHook, the task fails with the following error:
TypeError: Only io.IOBase, multidict and (name, file) pairs allowed
This is the log output
3cfd611b7c04
*** Found local files:
*** * /opt/airflow/logs/dag_id=new_relic_data_pipeline_observability_get_dbt_run_metadata2/run_id=scheduled__2024-06-07T12:45:00+00:00/task_id=process_runs/attempt=1.log
[2024-06-20, 13:00:43 EDT] {local_task_job_runner.py:120} ▶ Pre task execution logs
[2024-06-20, 13:00:43 EDT] {logging_mixin.py:188} INFO - Sending 1 to New Relic
[2024-06-20, 13:00:43 EDT] {logging_mixin.py:188} INFO - Run ids: ['291970397']
[2024-06-20, 13:00:43 EDT] {base.py:84} INFO - Using connection ID 'nr_insights_insert' for task execution.
[2024-06-20, 13:00:43 EDT] {logging_mixin.py:188} INFO - Sending Chunk 1 records for eventType: dbt_job_run
[2024-06-20, 13:00:43 EDT] {base.py:84} INFO - Using connection ID 'nr_insights_insert' for task execution.
[2024-06-20, 13:00:43 EDT] {taskinstance.py:441} ▼ Post task execution logs
[2024-06-20, 13:00:43 EDT] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/aiohttp/client_reqrep.py", line 551, in update_body_from_data
body = payload.PAYLOAD_REGISTRY.get(body, disposition=None)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/aiohttp/payload.py", line 119, in get
raise LookupError()
aiohttp.payload.LookupError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 400, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/decorators/base.py", line 265, in execute
return_value = super().execute(context)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 400, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/operators/python.py", line 235, in execute
return_value = self.execute_callable()
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/operators/python.py", line 252, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/dags/dbt_cloud_run_metadata/dag.py", line 254, in process_runs
upload_data(runs, nr_insights_insert, chunk_size=500)
File "/opt/airflow/dags/nr_utils/http.py", line 25, in upload_data
data = loop.run_until_complete(asyncio.gather(*responses))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/opt/airflow/dags/nr_utils/http.py", line 6, in run_in_loop
response = await hook.run(data=data, headers=headers, extra_options={'compress': True})
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/http/hooks/http.py", line 388, in run
response = await request_func(
^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/aiohttp/client.py", line 548, in _request
req = self._request_class(
^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/aiohttp/client_reqrep.py", line 335, in init
self.update_body_from_data(data)
File "/home/airflow/.local/lib/python3.12/site-packages/aiohttp/client_reqrep.py", line 553, in update_body_from_data
body = FormData(body)()
^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/aiohttp/formdata.py", line 38, in init
self.add_fields(*fields)
File "/home/airflow/.local/lib/python3.12/site-packages/aiohttp/formdata.py", line 117, in add_fields
raise TypeError(
TypeError: Only io.IOBase, multidict and (name, file) pairs allowed, use .add_field() for passing more complex parameters, got
...
[2024-06-20, 13:00:43 EDT] {local_task_job_runner.py:240} INFO - Task exited with return code 1
[2024-06-20, 13:00:43 EDT] {taskinstance.py:3498} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2024-06-20, 13:00:43 EDT] {local_task_job_runner.py:222} ▲▲▲ Log group end
The text was updated successfully, but these errors were encountered:
The http provider has gone through many changes over different versions. Older versions of HttpAsyncHook expect hook.run(data=data). Newer versions expect hook.run(json=data). To fix this modify airflow/dags/nr_utils/http.py
When running the DAG on older versions of HttpAsyncHook, the task fails with the following error:
TypeError: Only io.IOBase, multidict and (name, file) pairs allowed
This is the log output
3cfd611b7c04
*** Found local files:
*** * /opt/airflow/logs/dag_id=new_relic_data_pipeline_observability_get_dbt_run_metadata2/run_id=scheduled__2024-06-07T12:45:00+00:00/task_id=process_runs/attempt=1.log
[2024-06-20, 13:00:43 EDT] {local_task_job_runner.py:120} ▶ Pre task execution logs
[2024-06-20, 13:00:43 EDT] {logging_mixin.py:188} INFO - Sending 1 to New Relic
[2024-06-20, 13:00:43 EDT] {logging_mixin.py:188} INFO - Run ids: ['291970397']
[2024-06-20, 13:00:43 EDT] {base.py:84} INFO - Using connection ID 'nr_insights_insert' for task execution.
[2024-06-20, 13:00:43 EDT] {logging_mixin.py:188} INFO - Sending Chunk 1 records for eventType: dbt_job_run
[2024-06-20, 13:00:43 EDT] {base.py:84} INFO - Using connection ID 'nr_insights_insert' for task execution.
[2024-06-20, 13:00:43 EDT] {taskinstance.py:441} ▼ Post task execution logs
[2024-06-20, 13:00:43 EDT] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/aiohttp/client_reqrep.py", line 551, in update_body_from_data
body = payload.PAYLOAD_REGISTRY.get(body, disposition=None)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/aiohttp/payload.py", line 119, in get
raise LookupError()
aiohttp.payload.LookupError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 400, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/decorators/base.py", line 265, in execute
return_value = super().execute(context)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 400, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/operators/python.py", line 235, in execute
return_value = self.execute_callable()
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/operators/python.py", line 252, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/dags/dbt_cloud_run_metadata/dag.py", line 254, in process_runs
upload_data(runs, nr_insights_insert, chunk_size=500)
File "/opt/airflow/dags/nr_utils/http.py", line 25, in upload_data
data = loop.run_until_complete(asyncio.gather(*responses))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/opt/airflow/dags/nr_utils/http.py", line 6, in run_in_loop
response = await hook.run(data=data, headers=headers, extra_options={'compress': True})
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/http/hooks/http.py", line 388, in run
response = await request_func(
^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/aiohttp/client.py", line 548, in _request
req = self._request_class(
^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/aiohttp/client_reqrep.py", line 335, in init
self.update_body_from_data(data)
File "/home/airflow/.local/lib/python3.12/site-packages/aiohttp/client_reqrep.py", line 553, in update_body_from_data
body = FormData(body)()
^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/aiohttp/formdata.py", line 38, in init
self.add_fields(*fields)
File "/home/airflow/.local/lib/python3.12/site-packages/aiohttp/formdata.py", line 117, in add_fields
raise TypeError(
TypeError: Only io.IOBase, multidict and (name, file) pairs allowed, use .add_field() for passing more complex parameters, got
...
[2024-06-20, 13:00:43 EDT] {local_task_job_runner.py:240} INFO - Task exited with return code 1
[2024-06-20, 13:00:43 EDT] {taskinstance.py:3498} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2024-06-20, 13:00:43 EDT] {local_task_job_runner.py:222} ▲▲▲ Log group end
The text was updated successfully, but these errors were encountered: