refactor: rest client#217
Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors the write path to use a new synchronous RestClient (urllib3-based) instead of the previous generated client/write service wiring, and updates InfluxDBClient3 construction accordingly.
Changes:
- Refactors
WriteApito issue HTTP requests via a newRestClient, adds gzip decision/compression utilities, and adds endpoint/exception translation logic. - Introduces
influxdb_client_3/write_client/_sync/rest_client.pyto encapsulate urllib3 request/response handling. - Updates
InfluxDBClient3initialization to pass write connection details (base_url/auth/gzip) directly intoWriteApi.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 15 comments.
| File | Description |
|---|---|
influxdb_client_3/write_client/client/write_api.py |
Major refactor of write implementation to use a new REST client and custom request building/serialization paths. |
influxdb_client_3/write_client/_sync/rest_client.py |
Adds a new urllib3-based synchronous REST client abstraction used by WriteApi. |
influxdb_client_3/__init__.py |
Wires new write API constructor parameters (auth/gzip/base_url) and updates public kwargs documentation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
2b26e5c to
7d74438
Compare
52cee20 to
a7076f9
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 12 comments.
Comments suppressed due to low confidence (2)
tests/test_write_api.py:251
WriteApi.write()takes parameters in(bucket, org, record, ...)order, but this test passes(org, bucket, record). This makes the test less representative of real usage and can mask issues in how query parameters are constructed.
tests/test_write_api.py:293- This call passes
(org, bucket, record)instead of(bucket, org, record), which makes the timeout test less representative of real usage and inconsistent with other tests in this file.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (4)
influxdb_client_3/write_client/client/write_api.py:1171
__getstate__deletes_write_service, butWriteApino longer defines that attribute. This will raiseKeyErrorduring pickling (anddel state['_subject']/del state['_disposable']can also KeyError depending on instance state). Usestate.pop(..., None)and remove the stale_write_servicereference.
def __getstate__(self):
"""Return a dict of attributes that you want to pickle."""
state = self.__dict__.copy()
# Remove rx
del state['_subject']
del state['_disposable']
del state['_write_service']
return state
influxdb_client_3/write_client/client/write_api.py:1181
__setstate__callsself.__init__(self._write_options, self._point_settings, ...), butWriteApi.__init__requirestoken,bucket, andorgas the first arguments. As written, unpickling will raiseTypeError. Recreate the Rx batching pipeline based on the restored_write_optionsinstead of re-calling__init__with the wrong signature.
def __setstate__(self, state):
"""Set your object with the provided dict."""
self.__dict__.update(state)
# Init Rx
self.__init__(self._write_options,
self._point_settings,
success_callback=self._success_callback,
error_callback=self._error_callback,
retry_callback=self._retry_callback)
tests/test_write_api.py:252
WriteApi.write()expects positional arguments in the order(bucket, org, record). This call passes org first, which swaps the query parameters and makes the test less representative of real usage. Use keyword arguments (or correct positional order) to avoid accidental swaps.
tests/test_write_api.py:293WriteApi.write()positional argument order is(bucket, org, record). This test passes org then bucket, which is easy to get wrong and can hide routing/query-param bugs. Prefer keyword arguments to make the intent explicit (and keep_request_timeoutbehavior under test).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (5)
tests/test_write_api.py:29
mock_urllib3_timeout_requestis used as aside_effectforurllib3._request_methods.RequestMethods.request, which passes theRequestMethodsinstance as the first positional arg. The helper currently omits that parameter, somethod/urlare shifted and the test can behave incorrectly.
tests/test_write_api.py:252WriteApi.write()takesbucketas the first positional argument andorgas the second. This call passes them in the opposite order, so the request routing/query params will be wrong.
tests/test_write_api.py:293WriteApi.write()takesbucketas the first positional argument andorgas the second. This call passes them in the opposite order, so it won't exercise the intended timeout behavior for the right endpoint params.
influxdb_client_3/write_client/client/write_api.py:1171__getstate__deletesstate['_write_service'], but this class no longer defines_write_service. Pickling will raiseKeyErrorhere.
state = self.__dict__.copy()
# Remove rx
del state['_subject']
del state['_disposable']
del state['_write_service']
return state
influxdb_client_3/write_client/client/write_api.py:1181
__setstate__callsself.__init__(self._write_options, self._point_settings, ...), but__init__now requirestoken,bucket,org, and other constructor args. This will break unpickling; it should just recreate the Rx batching pipeline based on the restored_write_options.
def __setstate__(self, state):
"""Set your object with the provided dict."""
self.__dict__.update(state)
# Init Rx
self.__init__(self._write_options,
self._point_settings,
success_callback=self._success_callback,
error_callback=self._error_callback,
retry_callback=self._retry_callback)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (3)
tests/test_write_api.py:252
WriteApi.write()takes positional arguments in the order(bucket, org, record). This call passes org first and bucket second, which swaps routing parameters and can hide bugs in header/query handling. Prefer keyword arguments (or swap the positionals) to make the intent unambiguous.
tests/test_write_api.py:293- Same positional-argument issue as above: this passes org first and bucket second to
WriteApi.write(bucket, org, record), which inverts query parameters. Using keywords also makes the_request_timeoutintent clearer.
influxdb_client_3/write_client/client/write_api.py:1181 __getstate__/__setstate__are broken after the refactor:_write_serviceis no longer an attribute, sodel state['_write_service']will raiseKeyErrorduring pickling. Additionally,__setstate__calls__init__with the wrong arguments for the new constructor signature (it no longer accepts justwrite_optionsandpoint_settings). This makes pickling/unpickling unusable.
def __getstate__(self):
"""Return a dict of attributes that you want to pickle."""
state = self.__dict__.copy()
# Remove rx
del state['_subject']
del state['_disposable']
del state['_write_service']
return state
def __setstate__(self, state):
"""Set your object with the provided dict."""
self.__dict__.update(state)
# Init Rx
self.__init__(self._write_options,
self._point_settings,
success_callback=self._success_callback,
error_callback=self._error_callback,
retry_callback=self._retry_callback)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (3)
influxdb_client_3/write_client/client/write_api.py:1181
__getstate__deletesstate['_write_service'], butWriteApino longer defines_write_service, so pickling will raiseKeyError. Additionally,__setstate__calls__init__with the wrong signature (it now requirestoken,bucket,org, etc.), so unpickling will fail even if__getstate__succeeds. Safer approach:pop()optional fields and recreate the Rx pipeline based on_write_optionswithout re-calling__init__.
def __getstate__(self):
"""Return a dict of attributes that you want to pickle."""
state = self.__dict__.copy()
# Remove rx
del state['_subject']
del state['_disposable']
del state['_write_service']
return state
def __setstate__(self, state):
"""Set your object with the provided dict."""
self.__dict__.update(state)
# Init Rx
self.__init__(self._write_options,
self._point_settings,
success_callback=self._success_callback,
error_callback=self._error_callback,
retry_callback=self._retry_callback)
tests/test_write_api.py:252
WriteApi.write()expects positional arguments as(bucket, org, record, ...), but this test passes them as(org, bucket, record). That swaps query params and makes the test exercise the wrong behavior. Use keyword args (or correct positional order) to match thewrite()signature.
tests/test_write_api.py:294- This
write()call also swaps(org, bucket)positional order. Since the assertion is about timeout propagation rather than parameter order, using keyword args avoids accidentally testing the wrong thing.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (2)
tests/test_write_api.py:252
- In this test,
WriteApi.write()expects positional args in the order(bucket, org, record). Passing org first swaps the routing parameters and can hide bugs in request construction/error handling.
tests/test_write_api.py:293 WriteApi.write()positional parameters are(bucket, org, record). This call passes org first, which makes the test exercise the wrong request routing.
87ccdef to
273b4ce
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (3)
influxdb_client_3/write_client/client/write_api.py:1179
__getstate__()deletes_write_service, butWriteApino longer sets this attribute, so pickling will raiseKeyError. Additionally,__setstate__()calls__init__()with the wrong signature (it now requirestoken/bucket/org/...).
def __getstate__(self):
"""Return a dict of attributes that you want to pickle."""
state = self.__dict__.copy()
# Remove rx
del state['_subject']
del state['_disposable']
del state['_write_service']
return state
def __setstate__(self, state):
"""Set your object with the provided dict."""
self.__dict__.update(state)
# Init Rx
self.__init__(self._write_options,
self._point_settings,
success_callback=self._success_callback,
error_callback=self._error_callback,
retry_callback=self._retry_callback)
tests/test_write_api.py:252
- Argument order is swapped in this
write()call:WriteApi.write(bucket, org, record, ...)expects bucket first, org second, but this passes org then bucket. This makes the test less meaningful and can hide real regressions.
tests/test_write_api.py:293 - Argument order is swapped in this
write()call:WriteApi.write(bucket, org, record, ...)expects bucket first, org second. As written, the test exercises the wrong parameter mapping (and still "works" only because values are strings).
d2ad939 to
bc12a98
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (2)
tests/test_write_api.py:252
WriteApi.write()takes arguments in(bucket, org, record)order. This test currently passes(org, bucket, record), which swaps query params and can hide regressions in request construction.
tests/test_write_api.py:294- Same issue as above: the positional args to
write()are reversed here; the test should passbucketfirst andorgsecond (or use keyword args) so it actually exercises the correct URL/query-param behavior.
bednar
left a comment
There was a problem hiding this comment.
Thanks for the refactor. I found a few runtime regressions in the new write REST path that should be fixed before this lands.
| If the method is called asynchronously, | ||
| returns the request thread. | ||
| """ # noqa: E501 | ||
| local_var_params, path, path_params, query_params, header_params, body_params = \ |
There was a problem hiding this comment.
call_api() is still synchronous: when async_req is false it returns a RESTResponse, and when it is true it returns a thread-like ApplyResult. Awaiting it here makes the real success path fail with TypeError: object RESTResponse can't be used in 'await' expression; the current test hides this by replacing call_api with an AsyncMock. This method needs either a real async implementation or should stop being declared/used as an async coroutine.
|
When running the integration test e.g. A couple of observations:
@bednar do you have an opinion regarding this? |
karel-rehor
left a comment
There was a problem hiding this comment.
I see Jakub is reporting some of the same problems. Review to continue.
5548816 to
2825235
Compare
karel-rehor
left a comment
There was a problem hiding this comment.
I've still a few files to look at. I've been investigating the segmentation fault issue further but with no fresh information. There are some requests in the files I've looked at so far.
| list_results = reader.to_pylist() | ||
| self.assertEqual(data_size, len(list_results)) | ||
|
|
||
| @pytest.mark.skipif(running_on_circleci, reason="Skipping this test on CircleCI") |
There was a problem hiding this comment.
When running this test locally, I'm getting segmentation faults.
Fatal Python error: Segmentation fault
...
Current thread 0x00007b84b5d97b80 (most recent call first):
File "/home/karl/bonitoo/prjs/github.com/influxCommunity/influxdb3-python/influxdb_client_3/query/query_api.py", line 298 in close
File "/home/karl/bonitoo/prjs/github.com/influxCommunity/influxdb3-python/influxdb_client_3/write_client/client/util/multiprocessing_helper.py", line 164 in run
File "/home/karl/.pyenv/versions/3.12.7/lib/python3.12/multiprocessing/process.py", line 314 in _bootstrap
File "/home/karl/.pyenv/versions/3.12.7/lib/python3.12/multiprocessing/popen_fork.py", line 71 in _launch
Skipping a test specifically in CircleCI without a clear reason looks suspicious. It is possible that the test can run successfully locally in one environment, but then fail in another. When it fails the underlying cause needs to be addressed.
I have a further comment on this in the discussion part of this PR.
There was a problem hiding this comment.
I skipped it for CI only because I thought that the reason it failed in CI was something related to how multi-processors work in the CI environment.
There was a problem hiding this comment.
It appears that one co-pilot recommendation in this file was applied, but it remains unresolved. Can this be marked as resolved?
|
|
||
| # Close and set _query_api to None because query_api is not needed in this process. | ||
| # We only need write_api. | ||
| self.client._query_api.close() |
There was a problem hiding this comment.
This is the second to last call in the stack, when I'm getting a Segmentation Fault. It appears to be coming from the internal arrow flight client.
e.g.
Fatal Python error: Segmentation fault
...
Current thread 0x00007b9fa357eb80 (most recent call first):
File "/home/karl/bonitoo/prjs/github.com/influxCommunity/influxdb3-python/influxdb_client_3/query/query_api.py", line 298 in close
File "/home/karl/bonitoo/prjs/github.com/influxCommunity/influxdb3-python/influxdb_client_3/write_client/client/util/multiprocessing_helper.py", line 164 in run
File "/home/karl/.pyenv/versions/3.12.7/lib/python3.12/multiprocessing/process.py", line 314 in _bootstrap
File "/home/karl/.pyenv/versions/3.12.7/lib/python3.12/multiprocessing/popen_fork.py", line 71 in _launch
requires further investigation
| try: | ||
| return await self.call_api( | ||
| resource_path=path, | ||
| method='POST', | ||
| query_params=query_params, | ||
| header_params=header_params, | ||
| body=body, | ||
| async_req=local_var_params.get('async_req'), | ||
| _request_timeout=local_var_params.get('_request_timeout'), | ||
| urlopen_kw=kwargs.get('urlopen_kw', None)) | ||
| except ApiException as e: | ||
| raise self._translate_write_exception(e, use_v2_api) |
ed7c50a to
a7b4d07
Compare
|
+AI+ says (about SIGSEGV): • Cause is almost certainly native gRPC/Arrow Flight state being used after Exact flow in this PR:
and segfaults in native code. Why SIGSEGV instead of Python error: So: not “bad line protocol”, not the REST write path directly. The trigger is closing an unnecessary Flight client in the child. The deeper cause is mixing Linux fork() multiprocessing with Arrow Flight/gRPC native client lifecycle. Relevant docs:
Q: Would not creating Query API in "write-only" client help? If the write-only path never creates It also improves design: a writer should not initialize the query stack. In this PR, A better fix is either: or, preferably, lazy query creation so normal users do not need a flag: Then --- BUT --- Q: Is this helper any good, anyway??
My recommendation: remove it from core public API in this refactor, or deprecate it and move the pattern to If you want to keep functionality, a better design is not this helper. Use a simple documented pattern:
That aligns with @karel-rehor ’s concern: removing it is reasonable because it is not core, appears lightly used, and creates real native-process risk. |
|
OK after researching differences between why a segmentation fault is occurring in Linux but apparently not in Darwin, I came across the +AI+ suggestion that Darwin often uses the "spawn" start method by default in multiprocessing, while Linux relies on the traditional "fork" approach. In lines 121-132 ...
def __init__(self, **kwargs) -> None:
"""
Initialize defaults.
For more information how to initialize the writer see the examples above.
:param kwargs: arguments are passed into ``__init__`` function of ``InfluxDBClient`` and ``write_api``.
"""
multiprocessing.set_start_method("spawn", force=True)
multiprocessing.Process.__init__(self)
self.kwargs = kwargs
... 🎉 The integration test It still does not address all of the concerns mention by me earlier or in Aleses post. I still prefer this be removed and added as an advanced example. We might now consider calling the N.B. After adding the above change to my local fork. I see that the integration test now passes in CircleCI without issue. Addendum Further documentation https://docs.python.org/3.14/library/multiprocessing.html#contexts-and-start-methods. Note that according to this product documentation "set_start_method() should not be used more than once in the program.". So perhaps my suggestion here is not the best solution or work-around, but it is an indicator of how this might be fixed. btw. it is mentioned in the multiprocessing documentation that "spawn... (is) the default on Windows and macOS". |
Closes #216
Proposed Changes
Summarize
Removed features:
Checklist