diff --git a/src/base/tests/test_util.py b/src/base/tests/test_util.py index b2b169d8..90eadd94 100644 --- a/src/base/tests/test_util.py +++ b/src/base/tests/test_util.py @@ -13,6 +13,7 @@ except ImportError: import mock +from odoo import SUPERUSER_ID, api from odoo.osv.expression import FALSE_LEAF, TRUE_LEAF from odoo.tools import mute_logger from odoo.tools.safe_eval import safe_eval @@ -1436,6 +1437,8 @@ def not_doing_anything_converter(el): class TestHTMLFormat(UnitTestCase): def testsnip(self): + cr = self.registry.cursor() + env = api.Environment(cr, SUPERUSER_ID, {}) view_arch = """

@@ -1445,7 +1448,7 @@ def testsnip(self): """ - view_id = self.env["ir.ui.view"].create( + view_id = env["ir.ui.view"].create( { "name": "not_for_anything", "type": "qweb", @@ -1454,7 +1457,6 @@ def testsnip(self): "arch_db": view_arch, } ) - cr = self.env.cr snippets.convert_html_content( cr, snippets.html_converter( @@ -1462,7 +1464,10 @@ def testsnip(self): ), ) util.invalidate(view_id) - res = self.env["ir.ui.view"].search_read([("id", "=", view_id.id)], ["arch_db"]) + res = env["ir.ui.view"].search_read([("id", "=", view_id.id)], ["arch_db"]) + view_id.unlink() + cr.commit() + cr.close() self.assertEqual(len(res), 1) oneline = lambda s: re.sub(r"\s+", " ", s.strip()) self.assertEqual(oneline(res[0]["arch_db"]), oneline(view_arch)) diff --git a/src/util/snippets.py b/src/util/snippets.py index 76b77015..b8ff9310 100644 --- a/src/util/snippets.py +++ b/src/util/snippets.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import concurrent import inspect import logging import re @@ -11,6 +12,11 @@ from psycopg2.extensions import quote_ident from psycopg2.extras import Json +try: + from odoo.sql_db import db_connect +except ImportError: + from openerp.sql_db import db_connect + from .const import NEARLYWARN from .exceptions import MigrationError from .helpers import table_of_model @@ -243,31 +249,39 @@ def _dumps(self, node): class Convertor: - def __init__(self, converters, callback): + def __init__(self, converters, callback, dbname, update_query): self.converters = converters self.callback = callback + self.dbname = dbname + self.update_query = update_query - def __call__(self, row): + def __call__(self, query): converters = self.converters columns = self.converters.keys() converter_callback = self.callback - res_id, *contents = row - changes = {} - for column, content in zip(columns, contents): - if content and converters[column]: - # jsonb column; convert all keys - new_content = {} - has_changed, new_content["en_US"] = converter_callback(content.pop("en_US")) - if has_changed: - for lang, value in content.items(): - _, new_content[lang] = converter_callback(value) - new_content = Json(new_content) - else: - has_changed, new_content = converter_callback(content) - changes[column] = new_content - if has_changed: - changes["id"] = res_id - return changes + update_query = self.update_query + with db_connect(self.dbname).cursor() as cr: + cr.execute(query) + for row in cr.fetchall(): + res_id, *contents = row + changes = {} + for column, content in zip(columns, contents): + if content and converters[column]: + # jsonb column; convert all keys + new_content = {} + has_changed, new_content["en_US"] = converter_callback(content.pop("en_US")) + if has_changed: + for lang, value in content.items(): + _, new_content[lang] = converter_callback(value) + new_content = Json(new_content) + else: + has_changed, new_content = converter_callback(content) + changes[column] = new_content + if has_changed: + changes["id"] = res_id + if "id" in changes: + cr.execute(update_query, changes) + cr.commit() def convert_html_columns(cr, table, columns, converter_callback, where_column="IS NOT NULL", extra_where="true"): @@ -305,17 +319,25 @@ def convert_html_columns(cr, table, columns, converter_callback, where_column="I update_sql = ", ".join(f'"{column}" = %({column})s' for column in columns) update_query = f"UPDATE {table} SET {update_sql} WHERE id = %(id)s" + cr.commit() with ProcessPoolExecutor(max_workers=get_max_workers()) as executor: - convert = Convertor(converters, converter_callback) - for query in log_progress(split_queries, logger=_logger, qualifier=f"{table} updates"): - cr.execute(query) - for data in executor.map(convert, cr.fetchall(), chunksize=1000): - if "id" in data: - cr.execute(update_query, data) + convert = Convertor(converters, converter_callback, cr.dbname, update_query) + futures = [executor.submit(convert, query) for query in split_queries] + for future in log_progress( + concurrent.futures.as_completed(futures), + logger=_logger, + qualifier=f"{table} updates", + size=len(split_queries), + estimate=False, + log_hundred_percent=True, + ): + # just for raising any worker exception + future.result() + cr.commit() def determine_chunk_limit_ids(cr, table, column_arr, where): - bytes_per_chunk = 100 * 1024 * 1024 + bytes_per_chunk = 10 * 1024 * 1024 columns = ", ".join(quote_ident(column, cr._cnx) for column in column_arr if column != "id") cr.execute( f"""