Skip to content

Commit

Permalink
[IMP] snippets: move all work from parent to mp workers
Browse files Browse the repository at this point in the history
In `convert_html_columns()`, we select 100MiB worth of DB tuples and pass them
to a ProcessPoolExecutor together with a converter callable. So far, the
converter returns all tuples, changed or unchanged together with the
information if it has changed something. All this is returned through IPC to
the parent process. In the parent process, the caller only acts on the changed
tuples, though, the rest is ignored. In any scenario I've seen, only a small
proportion of the input tuples is actually changed, meaning that a large
proportion is returned through IPC unnecessarily.

What makes it worse is that processing of the converted results in the parent
process is often slower than the conversion, leading to two effects:
1) The results of all workers sit in the parent process's memory, possibly
   leading to MemoryError (upg-2021031)
2) The parallel processing is being serialized on the feedback, defeating a
   large part of the intended performance gains

To improve this, this commit
- moves all work into the workers, meaning not just the conversion filter, but
  also the DB query as well as the DB updates.
  - by doing so reduces the amount of data passed by IPC to just the query
    texts
  - by doing so distributes the data held in memory to all worker processes
- reduces the chunk size by one order of magnitude, which means
  - a lot less memory used at a time
  - a lot better distribution of "to-be-changed" rows when these rows are
    clustered in the table

All in all, in my test case, this
- reduces maximum process size in memory to 300MiB for all processes compared
  to formerly >2GiB (and MemoryError) in the parent process
- reduces runtime from 17 minutes to less than 2 minutes
  • Loading branch information
cawo-odoo committed Sep 23, 2024
1 parent 062d11a commit a697852
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 41 deletions.
57 changes: 30 additions & 27 deletions src/base/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
except ImportError:
import mock

from odoo import SUPERUSER_ID, api
from odoo.osv.expression import FALSE_LEAF, TRUE_LEAF
from odoo.tools import mute_logger
from odoo.tools.safe_eval import safe_eval
Expand Down Expand Up @@ -1436,33 +1437,35 @@ def not_doing_anything_converter(el):

class TestHTMLFormat(UnitTestCase):
def testsnip(self):
view_arch = """
<html>
<div class="fake_class_not_doing_anything"><br/></div>
<script>
(event) =&gt; {
};
</script>
</html>
"""
view_id = self.env["ir.ui.view"].create(
{
"name": "not_for_anything",
"type": "qweb",
"mode": "primary",
"key": "test.htmlconvert",
"arch_db": view_arch,
}
)
cr = self.env.cr
snippets.convert_html_content(
cr,
snippets.html_converter(
not_doing_anything_converter, selector="//*[hasclass('fake_class_not_doing_anything')]"
),
)
util.invalidate(view_id)
res = self.env["ir.ui.view"].search_read([("id", "=", view_id.id)], ["arch_db"])
with self.registry.cursor() as cr:
env = api.Environment(cr, SUPERUSER_ID, {})
view_arch = """
<html>
<div class="fake_class_not_doing_anything"><br/></div>
<script>
(event) =&gt; {
};
</script>
</html>
"""
view_id = env["ir.ui.view"].create(
{
"name": "not_for_anything",
"type": "qweb",
"mode": "primary",
"key": "test.htmlconvert",
"arch_db": view_arch,
}
)
snippets.convert_html_content(
cr,
snippets.html_converter(
not_doing_anything_converter, selector="//*[hasclass('fake_class_not_doing_anything')]"
),
)
util.invalidate(view_id)
res = env["ir.ui.view"].search_read([("id", "=", view_id.id)], ["arch_db"])
view_id.unlink()
self.assertEqual(len(res), 1)
oneline = lambda s: re.sub(r"\s+", " ", s.strip())
self.assertEqual(oneline(res[0]["arch_db"]), oneline(view_arch))
Expand Down
51 changes: 37 additions & 14 deletions src/util/snippets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import concurrent
import inspect
import logging
import re
Expand All @@ -11,6 +12,11 @@
from psycopg2.extensions import quote_ident
from psycopg2.extras import Json

try:
from odoo.sql_db import db_connect
except ImportError:
from openerp.sql_db import db_connect

from .const import NEARLYWARN
from .exceptions import MigrationError
from .helpers import table_of_model
Expand Down Expand Up @@ -243,16 +249,26 @@ def _dumps(self, node):


class Convertor:
def __init__(self, converters, callback):
def __init__(self, converters, callback, dbname, update_query):
self.converters = converters
self.callback = callback
self.dbname = dbname
self.update_query = update_query

def __call__(self, query):
with db_connect(self.dbname).cursor() as cr:
cr.execute(query)
for self.row in cr.fetchall():
self._convert_row()
if "id" in self.row:
cr.execute(self.update_query, self.row)

def __call__(self, row):
def _convert_row(self):
converters = self.converters
columns = self.converters.keys()
converter_callback = self.callback
res_id, *contents = row
changes = {}
res_id, *contents = self.row
self.row = {}
for column, content in zip(columns, contents):
if content and converters[column]:
# jsonb column; convert all keys
Expand All @@ -264,10 +280,9 @@ def __call__(self, row):
new_content = Json(new_content)
else:
has_changed, new_content = converter_callback(content)
changes[column] = new_content
self.row[column] = new_content
if has_changed:
changes["id"] = res_id
return changes
self.row["id"] = res_id


def convert_html_columns(cr, table, columns, converter_callback, where_column="IS NOT NULL", extra_where="true"):
Expand Down Expand Up @@ -305,17 +320,25 @@ def convert_html_columns(cr, table, columns, converter_callback, where_column="I
update_sql = ", ".join(f'"{column}" = %({column})s' for column in columns)
update_query = f"UPDATE {table} SET {update_sql} WHERE id = %(id)s"

cr.commit()
with ProcessPoolExecutor(max_workers=get_max_workers()) as executor:
convert = Convertor(converters, converter_callback)
for query in log_progress(split_queries, logger=_logger, qualifier=f"{table} updates"):
cr.execute(query)
for data in executor.map(convert, cr.fetchall(), chunksize=1000):
if "id" in data:
cr.execute(update_query, data)
convert = Convertor(converters, converter_callback, cr.dbname, update_query)
futures = [executor.submit(convert, query) for query in split_queries]
for future in log_progress(
concurrent.futures.as_completed(futures),
logger=_logger,
qualifier=f"{table} updates",
size=len(split_queries),
estimate=False,
log_hundred_percent=True,
):
# just for raising any worker exception
future.result()
cr.commit()


def determine_chunk_limit_ids(cr, table, column_arr, where):
bytes_per_chunk = 100 * 1024 * 1024
bytes_per_chunk = 10 * 1024 * 1024
columns = ", ".join(quote_ident(column, cr._cnx) for column in column_arr if column != "id")
cr.execute(
f"""
Expand Down

0 comments on commit a697852

Please sign in to comment.