-
-
Notifications
You must be signed in to change notification settings - Fork 248
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
oca-port: blacklist PR(s) 3335, 3345 for spec_driven_model
- Loading branch information
Showing
7 changed files
with
1,359 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
{ | ||
"pull_requests": { | ||
"OCA/l10n-brazil#3335": "done in #3442", | ||
"OCA/l10n-brazil#3345": "dotfiles not ported to 15.0 yet" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
# Copyright 2023 KMEE (Felipe Zago Rodrigues <[email protected]>) | ||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). | ||
|
||
from odoo.tests import TransactionCase | ||
|
||
|
||
class TestAccountNFCeContingency(TransactionCase): | ||
<<<<<<< HEAD | ||
@classmethod | ||
def setUpClass(cls): | ||
super().setUpClass() | ||
# this hook is required to test l10n_br_account_nfe alone: | ||
cls.env["spec.mixin.nfe"]._register_hook() | ||
cls.document_id = cls.env.ref("l10n_br_nfe.demo_nfce_same_state") | ||
cls.prepare_account_move_nfce() | ||
||||||| constructed merge base | ||
def setUp(self): | ||
super().setUp() | ||
# this hook is required to test l10n_br_account_nfe alone: | ||
self.env["spec.mixin.nfe"]._register_hook() | ||
self.document_id = self.env.ref("l10n_br_nfe.demo_nfce_same_state") | ||
self.prepare_account_move_nfce() | ||
======= | ||
def setUp(self): | ||
super().setUp() | ||
self.document_id = self.env.ref("l10n_br_nfe.demo_nfce_same_state") | ||
self.prepare_account_move_nfce() | ||
>>>>>>> [REM] l10n_br_account_nfe: drop _register_hook | ||
|
||
@classmethod | ||
def prepare_account_move_nfce(cls): | ||
receivable_account_id = cls.env["account.account"].create( | ||
{ | ||
"name": "TEST ACCOUNT", | ||
"code": "01.1.1.2.2", | ||
"reconcile": 1, | ||
"company_id": cls.env.ref("base.main_company").id, | ||
"user_type_id": cls.env.ref("account.data_account_type_receivable").id, | ||
} | ||
) | ||
payable_account_id = cls.env["account.account"].create( | ||
{ | ||
"name": "TEST ACCOUNT 2", | ||
"code": "01.1.1.2.3", | ||
"reconcile": 1, | ||
"company_id": cls.env.ref("base.main_company").id, | ||
"user_type_id": cls.env.ref("account.data_account_type_payable").id, | ||
} | ||
) | ||
payment_method = cls.env.ref("account.account_payment_method_manual_in").id | ||
journal_id = cls.env["account.journal"].create( | ||
{ | ||
"name": "JOURNAL TEST", | ||
"code": "TEST", | ||
"type": "bank", | ||
"company_id": cls.env.ref("base.main_company").id, | ||
} | ||
) | ||
payment_mode = cls.env["account.payment.mode"].create( | ||
{ | ||
"name": "PAYMENT MODE TEST", | ||
"company_id": cls.env.ref("base.main_company").id, | ||
"payment_method_id": payment_method, | ||
"fiscal_payment_mode": "15", | ||
"bank_account_link": "fixed", | ||
"fixed_journal_id": journal_id.id, | ||
} | ||
) | ||
cls.document_move_id = cls.env["account.move"].create( | ||
{ | ||
"name": "MOVE TEST", | ||
"payment_mode_id": payment_mode.id, | ||
"company_id": cls.env.ref("base.main_company").id, | ||
"line_ids": [ | ||
(0, 0, {"account_id": receivable_account_id.id, "credit": 10}), | ||
(0, 0, {"account_id": payable_account_id.id, "debit": 10}), | ||
], | ||
} | ||
) | ||
cls.document_move_id.fiscal_document_id = cls.document_id.id | ||
|
||
def test_nfce_contingencia(self): | ||
self.document_id._update_nfce_for_offline_contingency() | ||
self.assertIn(self.document_move_id, self.document_id.move_ids) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
# Copyright (C) 2019-2020 - Raphael Valyi Akretion | ||
# License AGPL-3 - See http://www.gnu.org/licenses/agpl-3.0.html | ||
import logging | ||
|
||
import nfelib | ||
import pkg_resources | ||
from nfelib.nfe.bindings.v4_0.leiaute_nfe_v4_00 import TnfeProc | ||
|
||
from odoo import SUPERUSER_ID, api | ||
from odoo.exceptions import ValidationError | ||
|
||
from odoo.addons.spec_driven_model import hooks | ||
|
||
_logger = logging.getLogger(__name__) | ||
|
||
|
||
def post_init_hook(cr, registry): | ||
env = api.Environment(cr, SUPERUSER_ID, {}) | ||
<<<<<<< HEAD | ||
hooks.register_hook( | ||
env, "l10n_br_nfe", "odoo.addons.l10n_br_nfe_spec.models.v4_0.leiaute_nfe_v4_00" | ||
) | ||
|
||
||||||| constructed merge base | ||
env["nfe.40.infnfe"]._register_hook() | ||
======= | ||
>>>>>>> [REF] l10n_br_nfe: further multi-schemas | ||
cr.execute("select demo from ir_module_module where name='l10n_br_nfe';") | ||
is_demo = cr.fetchone()[0] | ||
if is_demo: | ||
res_items = ( | ||
"nfe", | ||
"samples", | ||
"v4_0", | ||
"leiauteNFe", | ||
"35180834128745000152550010000474491454651420-nfe.xml", | ||
) | ||
resource_path = "/".join(res_items) | ||
nfe_stream = pkg_resources.resource_stream(nfelib.__name__, resource_path) | ||
binding = TnfeProc.from_xml(nfe_stream.read().decode()) | ||
document_number = binding.NFe.infNFe.ide.nNF | ||
existing_nfes = env["l10n_br_fiscal.document"].search( | ||
[("document_number", "=", document_number)] | ||
) | ||
try: | ||
existing_nfes.unlink() | ||
nfe = ( | ||
env["nfe.40.infnfe"] | ||
.with_context(tracking_disable=True, edoc_type="in") | ||
.build_from_binding("nfe", "40", binding.NFe.infNFe) | ||
) | ||
_logger.info(nfe.nfe40_emit.nfe40_CNPJ) | ||
except ValidationError: | ||
_logger.info("NF-e already %s imported by hooks" % (document_number,)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
# Copyright 2019 KMEE | ||
# License LGPL-3.0 or later (https://www.gnu.org/licenses/lgpl-3.0.en.html). | ||
|
||
import logging | ||
import sys | ||
|
||
from odoo import api, fields, models | ||
|
||
_logger = logging.getLogger(__name__) | ||
|
||
|
||
class SpecMixinExport(models.AbstractModel): | ||
_name = "spec.mixin_export" | ||
_description = "a mixin providing serialization features" | ||
|
||
@api.model | ||
def _get_binding_class(self, class_obj): | ||
binding_module = sys.modules[self._get_spec_property("binding_module")] | ||
for attr in class_obj._binding_type.split("."): | ||
binding_module = getattr(binding_module, attr) | ||
return binding_module | ||
|
||
@api.model | ||
def _get_model_classes(self): | ||
classes = [getattr(x, "_name", None) for x in type(self).mro()] | ||
return classes | ||
|
||
@api.model | ||
def _get_spec_classes(self, classes=False): | ||
if not classes: | ||
classes = self._get_model_classes() | ||
spec_classes = [] | ||
for c in set(classes): | ||
if c is None: | ||
continue | ||
<<<<<<< HEAD | ||
if not c.startswith("%s." % (self._schema_name,)): | ||
||||||| constructed merge base | ||
if not c.startswith(f"{self._schema_name}."): | ||
======= | ||
if not c.startswith(f"{self._context['spec_schema']}."): | ||
>>>>>>> [REF] spec_driven_model: further multi-schemas | ||
continue | ||
# the following filter to fields to show | ||
# when several XSD class are injected in the same object | ||
if self._context.get("spec_class") and c != self._context["spec_class"]: | ||
continue | ||
spec_classes.append(c) | ||
return spec_classes | ||
|
||
def _export_fields(self, xsd_fields, class_obj, export_dict): | ||
""" | ||
Iterate over the record fields and map them in an dict of values | ||
that will later be injected as **kwargs in the proper XML Python | ||
binding constructors. Hence the value can either be simple values or | ||
sub binding instances already properly instanciated. | ||
|
||
This method implements a dynamic dispatch checking if there is any | ||
method called _export_fields_CLASS_NAME to update the xsd_fields | ||
and export_dict variables, this way we allow controlling the | ||
flow of fields to export or injecting specific values in the | ||
field export. | ||
""" | ||
self.ensure_one() | ||
binding_class = self._get_binding_class(class_obj) | ||
binding_class_spec = binding_class.__dataclass_fields__ | ||
|
||
class_name = class_obj._name.replace(".", "_") | ||
export_method_name = "_export_fields_%s" % class_name | ||
if hasattr(self, export_method_name): | ||
xsd_fields = [i for i in xsd_fields] | ||
export_method = getattr(self, export_method_name) | ||
export_method(xsd_fields, class_obj, export_dict) | ||
|
||
for xsd_field in xsd_fields: | ||
if not xsd_field: | ||
continue | ||
if ( | ||
not self._fields.get(xsd_field) | ||
) and xsd_field not in self._get_stacking_points().keys(): | ||
continue | ||
field_spec_name = xsd_field.split("_")[1] # remove schema prefix | ||
field_spec = False | ||
for fname, fspec in binding_class_spec.items(): | ||
if fspec.metadata.get("name", {}) == field_spec_name: | ||
field_spec_name = fname | ||
if field_spec_name == fname: | ||
field_spec = fspec | ||
if field_spec and not field_spec.init: | ||
# case of xsd fixed values, we should not try to write them | ||
continue | ||
|
||
if not binding_class_spec.get(field_spec_name): | ||
# this can happen with a o2m generated foreign key for instance | ||
continue | ||
field_spec = binding_class_spec[field_spec_name] | ||
field_data = self._export_field( | ||
xsd_field, class_obj, field_spec, export_dict.get(field_spec_name) | ||
) | ||
if xsd_field in self._get_stacking_points().keys(): | ||
if not field_data: | ||
# stacked nested tags are skipped if empty | ||
continue | ||
elif not self[xsd_field] and not field_data: | ||
continue | ||
|
||
export_dict[field_spec_name] = field_data | ||
|
||
def _export_field(self, xsd_field, class_obj, field_spec, export_value=None): | ||
""" | ||
Map a single Odoo field to a python binding value according to the | ||
kind of field. | ||
""" | ||
self.ensure_one() | ||
# TODO: Export number required fields with Zero. | ||
field = class_obj._fields.get( | ||
xsd_field, self._get_stacking_points().get(xsd_field) | ||
) | ||
xsd_required = field.xsd_required if hasattr(field, "xsd_required") else None | ||
xsd_type = field.xsd_type if hasattr(field, "xsd_type") else None | ||
if field.type == "many2one": | ||
if (not self._get_stacking_points().get(xsd_field)) and ( | ||
not self[xsd_field] and not xsd_required | ||
): | ||
if field.comodel_name not in self._get_spec_classes(): | ||
return False | ||
if hasattr(field, "xsd_choice_required"): | ||
xsd_required = True | ||
return self._export_many2one(xsd_field, xsd_required, class_obj) | ||
elif self._fields[xsd_field].type == "one2many": | ||
return self._export_one2many(xsd_field, class_obj) | ||
elif self._fields[xsd_field].type == "datetime" and self[xsd_field]: | ||
return self._export_datetime(xsd_field) | ||
elif self._fields[xsd_field].type == "date" and self[xsd_field]: | ||
return self._export_date(xsd_field) | ||
elif ( | ||
self._fields[xsd_field].type in ("float", "monetary") | ||
and self[xsd_field] is not False | ||
): | ||
if hasattr(field, "xsd_choice_required"): | ||
xsd_required = True | ||
return self._export_float_monetary( | ||
xsd_field, xsd_type, class_obj, xsd_required, export_value | ||
) | ||
elif type(self[xsd_field]) is str: | ||
return self[xsd_field].strip() | ||
else: | ||
return self[xsd_field] | ||
|
||
def _export_many2one(self, field_name, xsd_required, class_obj=None): | ||
self.ensure_one() | ||
if field_name in self._get_stacking_points().keys(): | ||
return self._build_binding( | ||
class_name=self._get_stacking_points()[field_name].comodel_name | ||
) | ||
else: | ||
return (self[field_name] or self)._build_binding( | ||
class_name=class_obj._fields[field_name].comodel_name | ||
) | ||
|
||
def _export_one2many(self, field_name, class_obj=None): | ||
self.ensure_one() | ||
relational_data = [] | ||
for relational_field in self[field_name]: | ||
field_data = relational_field._build_binding( | ||
class_name=class_obj._fields[field_name].comodel_name | ||
) | ||
relational_data.append(field_data) | ||
return relational_data | ||
|
||
def _export_float_monetary( | ||
self, field_name, xsd_type, class_obj, xsd_required, export_value=None | ||
): | ||
self.ensure_one() | ||
field_data = export_value or self[field_name] | ||
# TODO check xsd_required for all fields to export? | ||
if not field_data and not xsd_required: | ||
return False | ||
if xsd_type and xsd_type.startswith("TDec"): | ||
tdec = "".join(filter(lambda x: x.isdigit(), xsd_type))[-2:] | ||
else: | ||
tdec = "" | ||
my_format = "%.{}f".format(tdec) | ||
return str(my_format % field_data) | ||
|
||
def _export_date(self, field_name): | ||
self.ensure_one() | ||
return str(self[field_name]) | ||
|
||
def _export_datetime(self, field_name): | ||
self.ensure_one() | ||
return str( | ||
fields.Datetime.context_timestamp( | ||
self, fields.Datetime.from_string(self[field_name]) | ||
).isoformat("T") | ||
) | ||
|
||
def _build_binding(self, spec_schema=None, spec_version=None, class_name=None): | ||
""" | ||
Iterate over an Odoo record and its m2o and o2m sub-records | ||
using a pre-order tree traversal and map the Odoo record values | ||
to a dict of Python binding values. | ||
|
||
These values will later be injected as **kwargs in the proper XML Python | ||
binding constructors. Hence the value can either be simple values or | ||
sub binding instances already properly instanciated. | ||
""" | ||
self.ensure_one() | ||
if spec_schema and spec_version: | ||
self = self.with_context(spec_schema=spec_schema, spec_version=spec_version) | ||
self.env[f"spec.mixin.{spec_schema}"]._register_hook() | ||
if not class_name: | ||
class_name = self._get_spec_property("stacking_mixin", self._name) | ||
|
||
class_obj = self.env[class_name] | ||
|
||
xsd_fields = ( | ||
i | ||
for i in class_obj._fields | ||
if class_obj._fields[i].name.startswith(f"{self._spec_prefix()}_") | ||
and "_choice" not in class_obj._fields[i].name | ||
) | ||
|
||
kwargs = {} | ||
binding_class = self._get_binding_class(class_obj) | ||
self._export_fields(xsd_fields, class_obj, export_dict=kwargs) | ||
sliced_kwargs = { | ||
key: kwargs.get(key) | ||
for key in binding_class.__dataclass_fields__.keys() | ||
if kwargs.get(key) | ||
} | ||
return binding_class(**sliced_kwargs) |
Oops, something went wrong.