Skip to content

Commit 922477f

Browse files
authored
feat: add MultiprocessingWriter to help user write data in independent OS process (#356)
1 parent 929ffc9 commit 922477f

File tree

5 files changed

+296
-1
lines changed

5 files changed

+296
-1
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
- `BucketsApi` - add possibility to: `update`
66
- `OrganizationsApi` - add possibility to: `update`
77
- `UsersApi` - add possibility to: `update`, `delete`, `find`
8+
1. [#356](https://github.com/influxdata/influxdb-client-python/pull/356): Add `MultiprocessingWriter` to write data in independent OS process
89

910
### Bug Fixes
1011
1. [#359](https://github.com/influxdata/influxdb-client-python/pull/359): Correct serialization empty columns into LineProtocol [DataFrame]

docs/api.rst

+12
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,15 @@ DeleteApi
6969

7070
.. autoclass:: influxdb_client.domain.DeletePredicateRequest
7171
:members:
72+
73+
Helpers
74+
"""""""
75+
.. autoclass:: influxdb_client.client.util.date_utils.DateHelper
76+
:members:
77+
78+
.. autoclass:: influxdb_client.client.util.date_utils_pandas.PandasDateTimeHelper
79+
:members:
80+
81+
.. autoclass:: influxdb_client.client.util.multiprocessing_helper.MultiprocessingWriter
82+
:members:
83+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
"""
2+
Helpers classes to make easier use the client in multiprocessing environment.
3+
4+
For more information how the multiprocessing works see Python's
5+
`reference docs <https://docs.python.org/3/library/multiprocessing.html>`_.
6+
"""
7+
import logging
8+
import multiprocessing
9+
10+
from influxdb_client import InfluxDBClient, WriteOptions
11+
from influxdb_client.client.exceptions import InfluxDBError
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
def _success_callback(conf: (str, str, str), data: str):
17+
"""Successfully writen batch."""
18+
logger.debug(f"Written batch: {conf}, data: {data}")
19+
20+
21+
def _error_callback(conf: (str, str, str), data: str, exception: InfluxDBError):
22+
"""Unsuccessfully writen batch."""
23+
logger.debug(f"Cannot write batch: {conf}, data: {data} due: {exception}")
24+
25+
26+
def _retry_callback(conf: (str, str, str), data: str, exception: InfluxDBError):
27+
"""Retryable error."""
28+
logger.debug(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
29+
30+
31+
class _PoisonPill:
32+
"""To notify process to terminate."""
33+
34+
pass
35+
36+
37+
class MultiprocessingWriter(multiprocessing.Process):
38+
"""
39+
The Helper class to write data into InfluxDB in independent OS process.
40+
41+
Example:
42+
.. code-block:: python
43+
44+
from influxdb_client import WriteOptions
45+
from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter
46+
47+
48+
def main():
49+
writer = MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org",
50+
write_options=WriteOptions(batch_size=100))
51+
writer.start()
52+
53+
for x in range(1, 1000):
54+
writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}")
55+
56+
writer.__del__()
57+
58+
59+
if __name__ == '__main__':
60+
main()
61+
62+
63+
How to use with context_manager:
64+
.. code-block:: python
65+
66+
from influxdb_client import WriteOptions
67+
from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter
68+
69+
70+
def main():
71+
with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org",
72+
write_options=WriteOptions(batch_size=100)) as writer:
73+
for x in range(1, 1000):
74+
writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}")
75+
76+
77+
if __name__ == '__main__':
78+
main()
79+
80+
81+
How to handle batch events:
82+
.. code-block:: python
83+
84+
from influxdb_client import WriteOptions
85+
from influxdb_client.client.exceptions import InfluxDBError
86+
from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter
87+
88+
89+
class BatchingCallback(object):
90+
91+
def success(self, conf: (str, str, str), data: str):
92+
print(f"Written batch: {conf}, data: {data}")
93+
94+
def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
95+
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
96+
97+
def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
98+
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
99+
100+
101+
def main():
102+
callback = BatchingCallback()
103+
with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org",
104+
success_callback=callback.success,
105+
error_callback=callback.error,
106+
retry_callback=callback.retry) as writer:
107+
108+
for x in range(1, 1000):
109+
writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}")
110+
111+
112+
if __name__ == '__main__':
113+
main()
114+
115+
116+
"""
117+
118+
__started__ = False
119+
__disposed__ = False
120+
121+
def __init__(self, **kwargs) -> None:
122+
"""
123+
Initialize defaults.
124+
125+
For more information how to initialize the writer see the examples above.
126+
127+
:param kwargs: arguments are passed into ``__init__`` function of ``InfluxDBClient`` and ``write_api``.
128+
"""
129+
multiprocessing.Process.__init__(self)
130+
self.kwargs = kwargs
131+
self.client = None
132+
self.write_api = None
133+
self.queue_ = multiprocessing.Manager().Queue()
134+
135+
def write(self, **kwargs) -> None:
136+
"""
137+
Append time-series data into underlying queue.
138+
139+
For more information how to pass arguments see the examples above.
140+
141+
:param kwargs: arguments are passed into ``write`` function of ``WriteApi``
142+
:return: None
143+
"""
144+
assert self.__disposed__ is False, 'Cannot write data: the writer is closed.'
145+
assert self.__started__ is True, 'Cannot write data: the writer is not started.'
146+
self.queue_.put(kwargs)
147+
148+
def run(self):
149+
"""Initialize ``InfluxDBClient`` and waits for data to writes into InfluxDB."""
150+
# Initialize Client and Write API
151+
self.client = InfluxDBClient(**self.kwargs)
152+
self.write_api = self.client.write_api(write_options=self.kwargs.get('write_options', WriteOptions()),
153+
success_callback=self.kwargs.get('success_callback', _success_callback),
154+
error_callback=self.kwargs.get('error_callback', _error_callback),
155+
retry_callback=self.kwargs.get('retry_callback', _retry_callback))
156+
# Infinite loop - until poison pill
157+
while True:
158+
next_record = self.queue_.get()
159+
if type(next_record) is _PoisonPill:
160+
# Poison pill means break the loop
161+
self.terminate()
162+
self.queue_.task_done()
163+
break
164+
self.write_api.write(**next_record)
165+
self.queue_.task_done()
166+
167+
def start(self) -> None:
168+
"""Start independent process for writing data into InfluxDB."""
169+
super().start()
170+
self.__started__ = True
171+
172+
def terminate(self) -> None:
173+
"""
174+
Cleanup resources in independent process.
175+
176+
This function **cannot be used** to terminate the ``MultiprocessingWriter``.
177+
If you want to finish your writes please call: ``__del__``.
178+
"""
179+
if self.write_api:
180+
logger.info("flushing data...")
181+
self.write_api.__del__()
182+
self.write_api = None
183+
if self.client:
184+
self.client.__del__()
185+
self.client = None
186+
logger.info("closed")
187+
188+
def __enter__(self):
189+
"""Enter the runtime context related to this object."""
190+
self.start()
191+
return self
192+
193+
def __exit__(self, exc_type, exc_value, traceback):
194+
"""Exit the runtime context related to this object."""
195+
self.__del__()
196+
197+
def __del__(self):
198+
"""Dispose the client and write_api."""
199+
if self.__started__:
200+
self.queue_.put(_PoisonPill())
201+
self.queue_.join()
202+
self.join()
203+
self.queue_ = None
204+
self.__started__ = False
205+
self.__disposed__ = True

influxdb_client/client/write_api.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -565,4 +565,9 @@ def __setstate__(self, state):
565565
"""Set your object with the provided dict."""
566566
self.__dict__.update(state)
567567
# Init Rx
568-
self.__init__(self._influxdb_client, self._write_options, self._point_settings)
568+
self.__init__(self._influxdb_client,
569+
self._write_options,
570+
self._point_settings,
571+
success_callback=self._success_callback,
572+
error_callback=self._error_callback,
573+
retry_callback=self._retry_callback)

tests/test_MultiprocessingWriter.py

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import os
2+
import unittest
3+
from datetime import datetime
4+
5+
from influxdb_client import WritePrecision, InfluxDBClient
6+
from influxdb_client.client.util.date_utils import get_date_helper
7+
from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter
8+
from influxdb_client.client.write_api import SYNCHRONOUS
9+
10+
11+
# noinspection PyMethodMayBeStatic
12+
class MultiprocessingWriterTest(unittest.TestCase):
13+
14+
def setUp(self) -> None:
15+
self.url = os.getenv('INFLUXDB_V2_URL', "http://localhost:8086")
16+
self.token = os.getenv('INFLUXDB_V2_TOKEN', "my-token")
17+
self.org = os.getenv('INFLUXDB_V2_ORG', "my-org")
18+
self.writer = None
19+
20+
def tearDown(self) -> None:
21+
if self.writer:
22+
self.writer.__del__()
23+
24+
def test_write_without_start(self):
25+
self.writer = MultiprocessingWriter(url=self.url, token=self.token, org=self.org,
26+
write_options=SYNCHRONOUS)
27+
28+
with self.assertRaises(AssertionError) as ve:
29+
self.writer.write(bucket="my-bucket", record=f"mem,tag=a value=5")
30+
31+
self.assertEqual('Cannot write data: the writer is not started.', f'{ve.exception}')
32+
33+
def test_write_after_terminate(self):
34+
self.writer = MultiprocessingWriter(url=self.url, token=self.token, org=self.org,
35+
write_options=SYNCHRONOUS)
36+
self.writer.start()
37+
self.writer.__del__()
38+
39+
with self.assertRaises(AssertionError) as ve:
40+
self.writer.write(bucket="my-bucket", record=f"mem,tag=a value=5")
41+
42+
self.assertEqual('Cannot write data: the writer is closed.', f'{ve.exception}')
43+
44+
def test_terminate_twice(self):
45+
with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
46+
writer.__del__()
47+
writer.terminate()
48+
writer.terminate()
49+
writer.__del__()
50+
51+
def test_use_context_manager(self):
52+
with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
53+
self.assertIsNotNone(writer)
54+
55+
def test_pass_parameters(self):
56+
unique = get_date_helper().to_nanoseconds(datetime.utcnow() - datetime.utcfromtimestamp(0))
57+
58+
# write data
59+
with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
60+
writer.write(bucket="my-bucket", record=f"mem_{unique},tag=a value=5i 10", write_precision=WritePrecision.S)
61+
62+
# query data
63+
with InfluxDBClient(url=self.url, token=self.token, org=self.org) as client:
64+
query_api = client.query_api()
65+
tables = query_api.query(
66+
f'from(bucket: "my-bucket") |> range(start: 0) |> filter(fn: (r) => r._measurement == "mem_{unique}")',
67+
self.org)
68+
record = tables[0].records[0]
69+
self.assertIsNotNone(record)
70+
self.assertEqual("a", record["tag"])
71+
self.assertEqual(5, record["_value"])
72+
self.assertEqual(get_date_helper().to_utc(datetime.utcfromtimestamp(10)), record["_time"])

0 commit comments

Comments
 (0)