Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lost Connection During Query / Attribute Error: NoneType object has no attribute write #83

Open
jmsardoy opened this issue Feb 6, 2024 · 0 comments

Comments

@jmsardoy
Copy link

jmsardoy commented Feb 6, 2024

Hi, I'm encountering difficulties with getting Meilisync to function properly due to a recurring issue of lost connection, accompanied by an error upon attempting to reconnect. I'm utilizing MySQL as the data source for Meilisync.

Here's what I've observed:

  • The ctl_conn of the binlogstream is consistently lost after a short period.
  • Whenever a database change is made, an exception is raised: asyncmy.errors.OperationalError: (2013, 'Lost connection to MySQL server during query').
  • Despite attempts to recreate the binlog stream upon encountering this exception, it seems that the self.ctl_conn.connect() line fails to execute as expected.
  • Subsequently, an additional exception is triggered: AttributeError: 'NoneType' object has no attribute 'write', indicating that the self._writer attribute of the connection remains None even after connect() is invoked, which should not be the case.

I managed to make a workound by moving self.ctl_conn initialization inside the _create_stream function on the mysql source class (i would happily make a PR with this change if necessary):

    async def _create_stream(self):
        self.ctl_conn = await asyncmy.connect(**self.kwargs)
        await self.ctl_conn.connect()
        self.stream = BinLogStream(
            self.conn,
            self.ctl_conn,
            server_id=self.server_id,
            master_log_file=self.progress["master_log_file"],
            master_log_position=int(self.progress["master_log_position"]),
            resume_stream=True,
            blocking=True,
            only_schemas=[self.database],
            only_tables=[f"{self.database}.{table}" for table in self.tables],
            only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
        )

    async def __aiter__(self):
        self.conn = await asyncmy.connect(**self.kwargs)
        if not self.progress:
            self.progress = await self.get_current_progress()
        yield ProgressEvent(
            progress=self.progress,
        )
        await self._create_stream()
                while True:
            try:
                async for event in self.stream:
                    ..:
            except OperationalError as e:
                logger.exception(f"Binlog stream error: {e}, sleep 10s and retry...")
                await asyncio.sleep(10)
                try:
                    await self.stream.close()
                    self.ctl_conn.close()
                    await self._create_stream()
                except Exception as e:
                    logger.exception(f"Recreate binlog stream error: {e}")

This adjustment resolves the reconnection issue, yet I aim to discover a method to maintain the connection alive to prevent recurrent OperationalError exception.

Here's the log of the OperationalError

2024-02-06 20:15:59.072 | ERROR    | meilisync.source.mysql:__aiter__:130 - Binlog stream error: (2013, 'Lost connection to MySQL server during query'), sleep 10s and retry...
Traceback (most recent call last):

  File "/usr/local/bin/meilisync", line 6, in <module>
    sys.exit(app())
    │   │    └ <typer.main.Typer object at 0x7f991300ba70>
    │   └ <built-in function exit>
    └ <module 'sys' (built-in)>
  File "/usr/local/lib/python3.12/site-packages/typer/main.py", line 311, in __call__
    return get_command(self)(*args, **kwargs)
           │           │      │       └ {}
           │           │      └ ()
           │           └ <typer.main.Typer object at 0x7f991300ba70>
           └ <function get_command at 0x7f9911e26340>
  File "/usr/local/lib/python3.12/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
           │    │     │       └ {}
           │    │     └ ()
           │    └ <function TyperGroup.main at 0x7f9911e24c20>
           └ <TyperGroup callback>
  File "/usr/local/lib/python3.12/site-packages/typer/core.py", line 778, in main
    return _main(
           └ <function _main at 0x7f9911e1fb00>
  File "/usr/local/lib/python3.12/site-packages/typer/core.py", line 216, in _main
    rv = self.invoke(ctx)
         │    │      └ <click.core.Context object at 0x7f990edf7c20>
         │    └ <function MultiCommand.invoke at 0x7f99122447c0>
         └ <TyperGroup callback>
  File "/usr/local/lib/python3.12/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
           │               │       │       │      └ <click.core.Context object at 0x7f990fe15040>
           │               │       │       └ <function Command.invoke at 0x7f9912244180>
           │               │       └ <TyperCommand start>
           │               └ <click.core.Context object at 0x7f990fe15040>
           └ <function MultiCommand.invoke.<locals>._process_result at 0x7f990e544040>
  File "/usr/local/lib/python3.12/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           │   │      │    │           │   └ {}
           │   │      │    │           └ <click.core.Context object at 0x7f990fe15040>
           │   │      │    └ <function start at 0x7f990e53a020>
           │   │      └ <TyperCommand start>
           │   └ <function Context.invoke at 0x7f9912222ac0>
           └ <click.core.Context object at 0x7f990fe15040>
  File "/usr/local/lib/python3.12/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
                       │       └ {}
                       └ ()
  File "/usr/local/lib/python3.12/site-packages/typer/main.py", line 683, in wrapper
    return callback(**use_params)  # type: ignore
           │          └ {'context': <click.core.Context object at 0x7f990fe15040>}
           └ <function start at 0x7f990e539e40>

  File "/meilisync/meilisync/main.py", line 155, in start
    asyncio.run(run())
    │       │   └ <function start.<locals>.run at 0x7f990e51d3a0>
    │       └ <function run at 0x7f9912dc5e40>
    └ <module 'asyncio' from '/usr/local/lib/python3.12/asyncio/__init__.py'>

  File "/usr/local/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
           │      │   └ <coroutine object start.<locals>.run at 0x7f990e512ea0>
           │      └ <function Runner.run at 0x7f991237bce0>
           └ <asyncio.runners.Runner object at 0x7f990e51a300>
  File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           │    │     │                  └ <Task pending name='Task-4' coro=<start.<locals>.run() running at /meilisync/meilisync/main.py:153> wait_for=<_GatheringFutur...
           │    │     └ <function BaseEventLoop.run_until_complete at 0x7f9912379940>
           │    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
           └ <asyncio.runners.Runner object at 0x7f990e51a300>
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
    self.run_forever()
    │    └ <function BaseEventLoop.run_forever at 0x7f99123798a0>
    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
    self._run_once()
    │    └ <function BaseEventLoop._run_once at 0x7f991237b6a0>
    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
    handle._run()
    │      └ <function Handle._run at 0x7f99124d9b20>
    └ <Handle Task.task_wakeup(<Future finished result=None>)>
  File "/usr/local/lib/python3.12/asyncio/events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
    │    │            │    │           │    └ <member '_args' of 'Handle' objects>
    │    │            │    │           └ <Handle Task.task_wakeup(<Future finished result=None>)>
    │    │            │    └ <member '_callback' of 'Handle' objects>
    │    │            └ <Handle Task.task_wakeup(<Future finished result=None>)>
    │    └ <member '_context' of 'Handle' objects>
    └ <Handle Task.task_wakeup(<Future finished result=None>)>

  File "/meilisync/meilisync/main.py", line 102, in _
    async for event in source:
              │        └ <meilisync.source.mysql.MySQL object at 0x7f990e9523c0>
              └ Event(progress={'master_log_file': 'mysql-bin-changelog.492584', 'master_log_position': 3540744}, type=<EventType.update: 'up...

> File "/meilisync/meilisync/source/mysql.py", line 108, in __aiter__
    async for event in self.stream:
              │        │    └ <asyncmy.replication.binlogstream.BinLogStream object at 0x7f990e52e5a0>
              │        └ <meilisync.source.mysql.MySQL object at 0x7f990e9523c0>
              └ <asyncmy.replication.row_events.UpdateRowsEvent object at 0x7f990e56c110>

  File "/usr/local/lib/python3.12/site-packages/asyncmy/replication/binlogstream.py", line 374, in __anext__
    ret = await self._read()
                │    └ <function BinLogStream._read at 0x7f990fe5eac0>
                └ <asyncmy.replication.binlogstream.BinLogStream object at 0x7f990e52e5a0>
  File "/usr/local/lib/python3.12/site-packages/asyncmy/replication/binlogstream.py", line 310, in _read
    await binlog_event.init()
          │            └ <function BinLogPacket.init at 0x7f990fe0bb00>
          └ <asyncmy.replication.packets.BinLogPacket object at 0x7f991021eff0>
  File "/usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py", line 140, in init
    self.event and await self.event.init()
    │    │               │    │     └ <function TableMapEvent.init at 0x7f990fe5d260>
    │    │               │    └ <asyncmy.replication.row_events.TableMapEvent object at 0x7f990e56d6d0>
    │    │               └ <asyncmy.replication.packets.BinLogPacket object at 0x7f991021eff0>
    │    └ <asyncmy.replication.row_events.TableMapEvent object at 0x7f990e56d6d0>
    └ <asyncmy.replication.packets.BinLogPacket object at 0x7f991021eff0>
  File "/usr/local/lib/python3.12/site-packages/asyncmy/replication/row_events.py", line 580, in init
    await self._connection._get_table_information(self.schema, self.table_name)
          │    │           │                      │    │       │    └ 'test_courses'
          │    │           │                      │    │       └ <asyncmy.replication.row_events.TableMapEvent object at 0x7f990e56d6d0>
          │    │           │                      │    └ 'test_schema'
          │    │           │                      └ <asyncmy.replication.row_events.TableMapEvent object at 0x7f990e56d6d0>
          │    │           └ <bound method BinLogStream._get_table_information of <asyncmy.replication.binlogstream.BinLogStream object at 0x7f990e52e5a0>>
          │    └ <asyncmy.connection.Connection object at 0x7f990e52f8c0>
          └ <asyncmy.replication.row_events.TableMapEvent object at 0x7f990e56d6d0>
  File "/usr/local/lib/python3.12/site-packages/asyncmy/replication/binlogstream.py", line 350, in _get_table_information
    await cursor.execute(
          │      └ <cyfunction Cursor.execute at 0x7f990fe07440>
          └ <asyncmy.cursors.DictCursor object at 0x7f990e97d760>
  File "asyncmy/cursors.pyx", line 179, in execute
    result = await self._query(query)
  File "asyncmy/cursors.pyx", line 364, in _query
    await conn.query(q)
  File "asyncmy/connection.pyx", line 494, in query
    await self._read_query_result(unbuffered=unbuffered)
  File "asyncmy/connection.pyx", line 682, in _read_query_result
    await result.read()
  File "asyncmy/connection.pyx", line 1069, in read
    first_packet = await self.connection.read_packet()
  File "asyncmy/connection.pyx", line 623, in read_packet
    raise errors.OperationalError(
          │      └ <class 'asyncmy.errors.OperationalError'>
          └ <module 'asyncmy.errors' from '/usr/local/lib/python3.12/site-packages/asyncmy/errors.cpython-312-x86_64-linux-gnu.so'>

asyncmy.errors.OperationalError: (2013, 'Lost connection to MySQL server during query')

Here's the AttributeError Exception


─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /meilisync/meilisync/main.py:155 in start                                                        │
│                                                                                                  │
│   152 │   │   lock = asyncio.Lock()                                                              │
│   153 │   │   await asyncio.gather(_(), interval())                                              │
│   154 │                                                                                          │
│ ❱ 155 │   asyncio.run(run())                                                                     │
│   156                                                                                            │
│   157                                                                                            │
│   158 @app.command(help="Refresh all data by swap index")                                        │
│                                                                                                  │
│ /usr/local/lib/python3.12/asyncio/runners.py:194 in run                                          │
│                                                                                                  │
│   191 │   │   │   "asyncio.run() cannot be called from a running event loop")                    │
│   192 │                                                                                          │
│   193 │   with Runner(debug=debug, loop_factory=loop_factory) as runner:                         │
│ ❱ 194 │   │   return runner.run(main)                                                            │
│   195                                                                                            │
│   196                                                                                            │
│   197 def _cancel_all_tasks(loop):                                                               │
│                                                                                                  │
│                                                                                                  │
│ /usr/local/lib/python3.12/asyncio/runners.py:118 in run                                          │
│                                                                                                  │
│   115 │   │                                                                                      │
│   116 │   │   self._interrupt_count = 0                                                          │
│   117 │   │   try:                                                                               │
│ ❱ 118 │   │   │   return self._loop.run_until_complete(task)                                     │
│   119 │   │   except exceptions.CancelledError:                                                  │
│   120 │   │   │   if self._interrupt_count > 0:                                                  │
│   121 │   │   │   │   uncancel = getattr(task, "uncancel", None)                                 │
│                                                                                                  │
│                                                                                                  │
│ /usr/local/lib/python3.12/asyncio/base_events.py:684 in run_until_complete                       │
│                                                                                                  │
│    681 │   │   if not future.done():                                                             │
│    682 │   │   │   raise RuntimeError('Event loop stopped before Future completed.')             │
│    683 │   │                                                                                     │
│ ❱  684 │   │   return future.result()                                                            │
│    685 │                                                                                         │
│    686 │   def stop(self):                                                                       │
│    687 │   │   """Stop running the event loop.                                                   │
│                                                                                                  │
│                                                                                                  │
│ /meilisync/meilisync/main.py:153 in run                                                          │
│                                                                                                  │
│   150 │   async def run():                                                                       │
│   151 │   │   nonlocal lock                                                                      │
│   152 │   │   lock = asyncio.Lock()                                                              │
│ ❱ 153 │   │   await asyncio.gather(_(), interval())                                              │
│   154 │                                                                                          │
│   155 │   asyncio.run(run())                                                                     │
│   156                                                                                            │
│                                                                                                  │
│                                                                                                  │
│ /meilisync/meilisync/main.py:102 in _                                                            │
│                                                                                                  │
│    99 │   │   │   │   │   │   f'No data found for table "{settings.source.database}.{sync.tabl   │
│   100 │   │   │   │   │   )                                                                      │
│   101 │   │   logger.info(f'Start increment sync data from "{settings.source.type}" to MeiliSe   │
│ ❱ 102 │   │   async for event in source:                                                         │
│   103 │   │   │   if settings.debug:                                                             │
│   104 │   │   │   │   logger.debug(event)                                                        │
│   105 │   │   │   current_progress = event.progress                                              │
│                                                                                                  │
│ /meilisync/meilisync/source/mysql.py:108 in __aiter__                                            │
│                                                                                                  │
│   105 │   │   await self._create_stream()                                                        │
│   106 │   │   while True:                                                                        │
│   107 │   │   │   try:                                                                           │
│ ❱ 108 │   │   │   │   async for event in self.stream:                                            │
│   109 │   │   │   │   │   self.ctl_conn = await asyncmy.connect(**self.kwargs)                   │
│   110 │   │   │   │   │   if isinstance(event, WriteRowsEvent):                                  │
│   111 │   │   │   │   │   │   event_type = EventType.create                                      │
│                                                                                                  │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/binlogstream.py:374 in __anext__     │
│                                                                                                  │
│   371 │   │   │   await self._connect()                                                          │
│   372 │   │   ret = await self._read()                                                           │
│   373 │   │   while ret is None:                                                                 │
│ ❱ 374 │   │   │   ret = await self._read()                                                       │
│   375 │   │   │   continue                                                                       │
│   376 │   │   return ret                                                                         │
│   377                                                                                            │
│                                                                                                  │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/binlogstream.py:310 in _read         │
│                                                                                                  │
│   307 │   │   │   self._ignored_schemas,                                                         │
│   308 │   │   │   self._freeze_schema,                                                           │
│   309 │   │   )                                                                                  │
│ ❱ 310 │   │   await binlog_event.init()                                                          │
│   311 │   │                                                                                      │
│   312 │   │   if binlog_event.event_type == ROTATE_EVENT:                                        │
│   313 │   │   │   self._master_log_position = binlog_event.event.position                        │
│                                                                                                  │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:140 in init               │
│                                                                                                  │
│   137 │   │   │   self.event = None                                                              │
│   138 │                                                                                          │
│   139 │   async def init(self):                                                                  │
│ ❱ 140 │   │   self.event and await self.event.init()                                             │
│   141 │                                                                                          │
│   142 │   def read(self, size):                                                                  │
│   143 │   │   size = int(size)                                                                   │
│                                                                                                  │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/row_events.py:580 in init            │
│                                                                                                  │
│   577 │   │   │   self.column_schemas = self._table_map[self.table_id].column_schemas            │
│   578 │   │   else:                                                                              │
│   579 │   │   │   self.column_schemas = await (                                                  │
│ ❱ 580 │   │   │   │   await self._connection._get_table_information(self.schema, self.table_na   │
│   581 │   │   │   )                                                                              │
│   582 │   │   ordinal_pos_loc = 0                                                                │
│   583                                                                                            │
│                                                                                                  │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/binlogstream.py:350 in               │
│ _get_table_information                                                                           │
│                                                                                                  │
│   347 │                                                                                          │
│   348 │   async def _get_table_information(self, schema, table):                                 │
│   349 │   │   async with self._ctl_connection.cursor(DictCursor) as cursor:                      │
│ ❱ 350 │   │   │   await cursor.execute(                                                          │
│   351 │   │   │   │   """                                                                        │
│   352 │   │   │   │   │   SELECT                                                                 │
│   353 │   │   │   │   │   │   COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,                   │
│                                                                                                  │
│                                                                                                  │
│ in execute:179                                                                                   │
│                                                                                                  │
│ in _query:364                                                                                    │
│                                                                                                  │
│ in query:493                                                                                     │
│                                                                                                  │
│ in _execute_command:729                                                                          │
│                                                                                                  │
│ in asyncmy.connection.Connection._write_bytes:668                                                │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
AttributeError: 'NoneType' object has no attribute 'write'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant