-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathodoo.py
377 lines (285 loc) · 12.4 KB
/
odoo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
import xmlrpc.client, logging, time, socket, ssl
from typing import List, Tuple, Union, Dict
logger = logging.getLogger('odoo_connector')
DomainT = List[Tuple[str, str, any]]
IdsT = Union[int, List[int]]
class x2m:
_type = 'x2m'
def __init__(self, field: str, model: str, fields: List[str]):
self.field_name = field
self.model = model
self.fields = fields
def gather_ids_to_fetch(self, records: List[dict]) -> list:
ids = set()
for record in records:
ids.update(record[self.field_name])
return list(ids)
def field_to_recordset(self, records: List[dict], field_records: Dict[str, dict]):
for record in records:
ids = record[self.field_name]
record[self.field_name] = [
field_records[id]
for id in ids
]
return records
class m2o(x2m):
_type = 'm2o'
def gather_ids_to_fetch(self, records: List[dict]) -> list:
ids = set()
for record in records:
if field := record[self.field_name]:
ids.add(field[0])
return list(ids)
def field_to_recordset(self, records: List[dict], field_records: Dict[str, dict]):
for record in records:
if field := record[self.field_name]:
id = field[0]
record[self.field_name] = field_records[id]
return records
FieldsT = Union[List[str], x2m]
class Model:
def __init__(self, env, model: str):
self.env = env
self.model = model
def call(self, ids: IdsT, method: str, *args, **kwargs):
""" Calls a method on selected record ids
:param ids: ids to call the method on
:param method: name of method to call
:param args: method args
:param kwargs: method kwargs
:return: same as method return
"""
if isinstance(ids, int):
ids = [ids]
logger.debug(f"Call_Records ({self.model}:{ids}): {args} {kwargs}")
if self.env._perm_call:
return self.env._exec(self.model, method, [ids] + list(args), kwargs)
def call_model(self, method: str, *args, **kwargs):
""" Calls a model method
:param method: name of method to call
:param args: method args
:param kwargs: method kwargs
:return: same as method return
"""
logger.debug(f"Call_Model ({self.model}): {args} {kwargs}")
if self.env._perm_call:
return self.env._exec(self.model, method, args, kwargs)
""" Read """
def search(self, domain: DomainT, offset: int = None, limit: int = None) -> List[int]:
""" Searches a model for specific attributes and returns ids
:param domain: filter
:param offset: number to offset the returned items by
:param limit: limit the number of items by a specified amount
:return: list of record ids
"""
conditions = {}
if offset:
conditions.update({'offset': offset})
if limit:
conditions.update({'limit': limit})
logger.debug(f"Search ({self.model}): {domain}")
return self.env._exec(self.model, 'search', [domain], conditions)
def browse(self, ids: IdsT, fields: FieldsT) -> List[dict]:
""" Reads the specified records and returns specified fields
:param ids: id OR list of ids to read
:param fields: list of fields to return
:return: list of record dicts
"""
if isinstance(ids, int):
ids = [ids]
logger.debug(f"Read ({self.model}): {ids}")
# Grab Many-fields for post processing
fields, many_fields = extract_many_fields(fields)
result = self.env._exec(self.model, 'read', [ids], {'fields': fields})
return apply_many_fields(self.env, result, many_fields)
def search_browse(self, domain: DomainT, fields: FieldsT, offset: int = None, limit: int = None) -> List[dict]:
""" Searches for records and returns a specified records
:param domain: filter
:param fields: list of fields to return
:param offset: number to offset the returned items by
:param limit: limit the number of items by a specified amount
:return: list of record dicts
"""
logger.debug(f"Search_Read ({self.model}): {domain}")
# Grab Many-fields for post processing
fields, many_fields = extract_many_fields(fields)
fields = {'fields': fields}
if offset:
fields.update({'offset': offset})
if limit:
fields.update({'limit': limit})
result = self.env._exec(self.model, 'search_read', [domain], fields)
return apply_many_fields(self.env, result, many_fields)
def search_count(self, domain: DomainT) -> int:
""" Searches a model and returns the number of matching records
:param domain: filter
:return: number of matching records
"""
logger.debug(f"Search_Count ({self.model}): {domain}")
return self.env._exec(self.model, 'search_count', [domain])
""" Write """
def create(self, fields: Dict[str, any]) -> int:
""" Creates a new record
:param fields: field values to set in new record
e.g.
{'name': "Item 1", 'description': "Test 1"}
:return: id of new record
"""
logger.info(f"Create ({self.model}): {fields}")
if self.env._perm_write:
return self.env._exec(self.model, 'create', [fields])
def write(self, ids: IdsT, fields: Dict[str, any]) -> bool:
""" Updates existing records
:param ids: list of ids to update
:param fields: dict of fields and their values
:return: True if fields were written successfully, otherwise False
"""
if isinstance(ids, int):
ids = [ids]
logger.info(f"Write ({self.model}): {ids} - {fields}")
if self.env._perm_write:
return self.env._exec(self.model, 'write', [ids, fields])
def delete(self, ids: IdsT) -> bool:
""" Deletes specified ids
:param ids: list of ids to delete
:return: True if record was deleted successfully, otherwise False
"""
if isinstance(ids, int):
ids = [ids]
logger.info(f"Unlink ({self.model}): {ids}")
try:
if self.env._perm_write:
return self.env._exec(self.model, 'unlink', ids) or False
# Return false if id doesn't exist
except xmlrpc.client.Fault as e:
# Record doesn't exist
if e.faultCode == 2 and 'not exist' in e.faultString:
logger.error(f"Could not delete records {ids}: does not exist")
return False
# Linked to other records
elif e.faultCode == 1 and 'If possible, archive it instead' in e.faultString:
logger.error(f"Could not delete records {ids}: other records rely on these")
return False
raise
class Odoo:
""" CRUD """
def __init__(self, database: str, username: str, password: str, url: str, port: int, perm_write=True, perm_call=True):
""" Create connection """
self.db = database
self.username = username
self.password = password
self.port = port
self.url_common = f"{url}:{port}/xmlrpc/2/common"
self.url_models = f"{url}:{port}/xmlrpc/2/object"
self.odoo_common = xmlrpc.client.ServerProxy(self.url_common)
self.odoo_models = xmlrpc.client.ServerProxy(self.url_models)
if perm_write:
logger.warning(f"Write permissions ENABLED for {database}")
else:
logger.warning(f"Write permissions DISABLED for {database}")
if perm_call:
logger.warning(f"Call permissions ENABLED for {database}")
else:
logger.warning(f"Call permissions DISABLED for {database}")
self._perm_write = perm_write
self._perm_call = perm_call
self._connect()
def _connect(self):
""" Connect and authenticate with Odoo """
logger.info(f"Connecting to Odoo: {self.db}")
self.uid = None
retry_delay = 5
while self.uid is None:
try:
self.uid = self.odoo_common.authenticate(
self.db,
self.username,
self.password,
{}
)
logger.info("Connection successful!")
except ConnectionRefusedError:
logging.critical(
f"Connection refused! (cannot access Odoo on port {self.port}) {self.url_common} Trying again in {retry_delay} seconds.")
except TimeoutError:
logging.critical(f"Connection timed out! {self.url_common} Trying again in {retry_delay} seconds.")
# Database not found
except xmlrpc.client.Fault as e:
if f'database "{self.db}" does not exist' in e.faultString:
raise xmlrpc.client.Fault(e.faultCode, f"Database not found: {self.db}") from e
raise
# URL error
except socket.gaierror as e:
if e.errno == 11001:
raise socket.gaierror(e.errno, f"Bad url: {self.url_common}")
# Using https on http port
except ssl.SSLError as e:
if e.reason == 'WRONG_VERSION_NUMBER':
raise Exception("Bad SSL (Probably need http. Are you using https?)") from e
raise
# using http on https port
except xmlrpc.client.ProtocolError as e:
raise Exception("ProtocolError (e.g. make sure you're using https on an https port)") from e
time.sleep(retry_delay)
retry_delay = min(30, retry_delay + 5)
def _exec(self, *args):
""" Abstracted communication with Odoo """
try:
return self.odoo_models.execute_kw(
self.db,
self.uid,
self.password,
*args
)
except xmlrpc.client.Fault as e:
if 'security.check(db,uid,passwd)' in e.faultString:
raise xmlrpc.client.Fault(e.faultCode, f"Wrong username or password!")
raise
except Exception as e:
logger.error(f"Error in _exec(): {args}\n{e}")
raise
def __getitem__(self, model: str) -> Model:
return Model(self, model)
def extract_many_fields(fields: FieldsT) -> Tuple[List[str], List[x2m]]:
""" Separate 'string' fields from 'Many' fields
Every 'many' field requires a call to Odoo AFTER the parent model call
EXAMPLE:
A call to Odoo using a 'many' field would work something like this:
fields_to_get = ['name', Many('partner_id', 'res.partner', ['name', 'email'])]
customer = odoo['sale.order'].browse(5, fields_to_get)
The following operations will be performed:
Fetch sale.order id=5 with fields 'name' and 'partner_id'
Fetch res.partner ids=(determined by previous fetch) with fields 'name' and 'email'
Merge res.partner records into the sale.order record
:param fields: list of fields
:return: Tuple (fields_list, many_fields_list)
"""
many_fields = []
for i, field in enumerate(fields):
if isinstance(field, x2m):
# Save Many-field for post-processing
many_fields.append(field)
# Replace Many-field with string
fields[i] = field.field_name
return fields, many_fields
def apply_many_fields(odoo: Odoo, fetched_records: List[dict], many_fields: List[x2m]) -> List[dict]:
""" fetches additional required model data
:param odoo: used to fetch new data from odoo
:param fetched_records: recordset to update fields
:param many_fields: list of 'Many' objects
:return: modified fetched_records
"""
for many_field in many_fields:
model = many_field.model
# Gather list of ids to fetch
ids = many_field.gather_ids_to_fetch(fetched_records)
# Fetch record dicts
field_records = odoo[model].browse(ids, many_field.fields)
# Organize records into a dict (key=id)
field_records = {
record['id']: record
for record in field_records
}
# Apply field_records to each record in fetched_records
fetched_records = many_field.field_to_recordset(fetched_records, field_records)
return fetched_records