PYTHON-5846 Consolidate CMAP, heartbeat, and SDAM telemetry into _telemetry.py#2907
PYTHON-5846 Consolidate CMAP, heartbeat, and SDAM telemetry into _telemetry.py#2907blink1073 wants to merge 21 commits into
Conversation
…emetry.py Add _CmapTelemetry, _HeartbeatTelemetry, and _SdamTelemetry classes to eliminate the repetitive if-enabled_for_cmap / if-logger.isEnabledFor boilerplate spread across pool.py, monitor.py, topology.py, and server.py.
- Rename _CmapTelemetry._log -> _emit_log for consistency with _CommandTelemetry - Rename _HeartbeatTelemetry.apm_started -> started to match started/succeeded/failed lifecycle - Rename _HeartbeatTelemetry.log_started -> emit_started_log to signal it is the deferred log-only half - Replace is_sdam flag with separate publish/log bool parameters on _CmapTelemetry
_CmapTelemetry now owns connection-creation and checkout durations: connection_created() starts the clock, connection_ready() computes it; checkout_started() starts the clock, checkout_succeeded/failed() compute it. _HeartbeatTelemetry.started() starts the clock; failed() computes its own duration instead of receiving it as a parameter. Removes checkout_started_time from Pool._get_conn, _raise_if_not_ready, and _raise_wait_queue_timeout, and removes AsyncConnection.creation_time.
Use response.awaitable (ground truth from wire) rather than self._awaited (pre-computed before response) for ServerHeartbeatSucceededEvent.awaited. Also fix stale method references in _HeartbeatTelemetry docstring.
_CmapTelemetry, _HeartbeatTelemetry, and _SdamTelemetry now compute _should_publish and _should_log as properties that check listener state and logger level at call time rather than caching derived booleans in the constructor.
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
…to _telemetry.py
…y classes; document why long-lived classes use properties
…ion computation in _CmapTelemetry and _HeartbeatTelemetry
…g in _CmapTelemetry
| clientId=self._client._topology_id, | ||
| commandName=self._operation, | ||
| operationId=self._operation_id, | ||
| log_command_retry( |
There was a problem hiding this comment.
What do you think about a helper method here to avoid duplicating the entire log_command_retry call except for is_write between this and the next usage?
| from pymongo.hello import Hello | ||
| from pymongo.lock import _async_create_lock | ||
| from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage | ||
| from pymongo.logger import _SDAM_LOGGER, _debug_log |
There was a problem hiding this comment.
They were used, I missed one direct usage, now moved to _telemetry.py
| should_publish = self._should_publish | ||
| # Always record start time: logging or publishing may be enabled by the time | ||
| # checkout_succeeded or checkout_failed is called to compute the duration. | ||
| self._checkout_start = time.monotonic() |
There was a problem hiding this comment.
This is shared across all connections in the pool and is called outside of a lock, introducing a possible race condition that can cause timing inconsistencies.
There was a problem hiding this comment.
Updated to keep the timing info in the pool class as it was before.
| def connection_created(self, conn_id: int) -> None: | ||
| # Always record start time: logging or publishing may be enabled by the time | ||
| # connection_ready is called to compute the duration. | ||
| self._conn_created_start = time.monotonic() |
There was a problem hiding this comment.
Same race condition here as above.
| clientId=self.description._topology_settings._topology_id, | ||
| failure=self._error_message(selector), | ||
| ) | ||
| ss.failed(self._error_message(selector)) |
There was a problem hiding this comment.
This will log self.description at the time of creation instead of at the time of failure. If SDAM updates self.description while this loop is waiting and before it calls ss.failed, we log the old description instead of the new.
Co-authored-by: Noah Stapp <noah@noahstapp.com>
- Fix race condition in _CmapTelemetry: remove shared _conn_created_start and _checkout_start slots; connection_ready now takes creation_time from AsyncConnection.creation_time, checkout_started returns the start time and checkout_succeeded/failed accept it as a parameter - Move SRV monitor failure log into _telemetry.py as log_srv_monitor_failure - Add _log_retry helper to _ClientConnectionRetryable to deduplicate log_command_retry call sites - Fix _ServerSelectionTelemetry.failed to accept live topology_description at failure time instead of using the stale snapshot from construction
| await self.checkin(conn) | ||
|
|
||
| def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None: | ||
| def _raise_if_not_ready(self, emit_event: bool, checkout_start: float) -> None: |
There was a problem hiding this comment.
intentional re-ordering + naming of checkout_started_time?
There was a problem hiding this comment.
Restored to original signature
|
|
||
| async def _get_conn( | ||
| self, checkout_started_time: float, handler: Optional[_MongoClientErrorHandler] = None | ||
| self, handler: Optional[_MongoClientErrorHandler] = None, checkout_start: float = 0.0 |
There was a problem hiding this comment.
Same here. Also, does a default of 0 make sense here?
There was a problem hiding this comment.
Restored to original signature
| self._raise_wait_queue_timeout(checkout_started_time) | ||
| self._raise_if_not_ready(checkout_started_time, emit_event=False) | ||
| self._raise_wait_queue_timeout(checkout_start) | ||
| self._raise_if_not_ready(emit_event=False, checkout_start=checkout_start) |
| **extra, | ||
| ) | ||
|
|
||
| def pool_created(self, non_default_options: dict[str, Any]) -> None: |
There was a problem hiding this comment.
Need docstrings on these public methods for consistency with the other telemetry classes.
There was a problem hiding this comment.
Added :allthedocstrings:
| topologyDescription=self.description, | ||
| clientId=self.description._topology_settings._topology_id, | ||
| ) | ||
| ss = _ServerSelectionTelemetry( |
There was a problem hiding this comment.
Can we pass this through as an argument from select_server so we don't have to make two _ServerSelectionTelemetry per call?
| @@ -74,6 +69,7 @@ def __init__( | |||
| self._events = None | |||
There was a problem hiding this comment.
Can we cut all three of these now that _SdamTelemetry exists?
| remainingTimeMS=remaining_time_ms, | ||
| ) | ||
|
|
||
| def failed(self, failure: str, topology_description: Any = None) -> None: |
There was a problem hiding this comment.
topology_description appears to always be passed
- Restore original _raise_if_not_ready and _get_conn parameter names/order - Add docstrings to all public methods on _CmapTelemetry and _SdamTelemetry - Remove redundant _publish/_listener/_events attrs from Server.__init__ - Avoid creating two _ServerSelectionTelemetry per select_server call by threading ss through select_servers and _select_server - Make _ServerSelectionTelemetry.failed topology_description a required arg - Move SRV monitor failure log to telemetry.py (log_srv_monitor_failure)
…t_servers/_select_server
| serverHost=server.description.address[0], | ||
| serverPort=server.description.address[1], | ||
| ) | ||
| ss.succeeded(server.description.address[0], server.description.address[1]) |
There was a problem hiding this comment.
Similar here to the ss.failed() issue earlier, should we pass the current description to be logged instead of the potentially stale one saved to ss at the start of selection?
There was a problem hiding this comment.
Pull request overview
This PR extends the existing internal _CommandTelemetry pattern to CMAP, heartbeat, SDAM, and server-selection telemetry by consolidating structured logging and APM event publishing into pymongo/_telemetry.py, creating a single integration point for future OpenTelemetry hooks.
Changes:
- Introduces internal telemetry helpers (
_CmapTelemetry,_HeartbeatTelemetry,_SdamTelemetry,_ServerSelectionTelemetry) and logging helpers (log_srv_monitor_failure,log_command_retry) inpymongo/_telemetry.py. - Refactors sync/async Topology, Pool, Monitor, Server, and MongoClient codepaths to delegate telemetry/logging behavior to the consolidated helpers.
- Updates affected tests/utilities to account for
Topology.select_servers()/_select_server()now returning an additional telemetry value.
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| test/utils.py | Updates helper(s) to unpack the new Topology._select_server / select_servers return shape. |
| test/utils_selection_tests.py | Updates server selection scenario harness to unpack select_servers results. |
| test/test_read_preferences.py | Adjusts nearest-mode test to iterate servers from the updated select_servers return shape. |
| test/test_mongos_load_balancing.py | Adjusts helper to iterate servers from the updated select_servers return shape. |
| test/asynchronous/utils.py | Async equivalent updates for unpacking updated topology selection returns. |
| test/asynchronous/utils_selection_tests.py | Async equivalent updates for server selection scenario harness. |
| test/asynchronous/test_read_preferences.py | Async equivalent updates for nearest-mode test iteration. |
| test/asynchronous/test_mongos_load_balancing.py | Async equivalent updates for helper iteration. |
| pymongo/synchronous/topology.py | Routes SDAM + server-selection structured logging/APM through _telemetry.py; changes select_servers to return telemetry info. |
| pymongo/synchronous/server.py | Routes server closed events/logging through _SdamTelemetry. |
| pymongo/synchronous/pool.py | Routes CMAP structured logging/APM through _CmapTelemetry across pool/connection lifecycle. |
| pymongo/synchronous/monitor.py | Routes heartbeat structured logging/APM through _HeartbeatTelemetry and SRV monitor failure logging helper. |
| pymongo/synchronous/mongo_client.py | Routes retry log entries through log_command_retry. |
| pymongo/asynchronous/topology.py | Async equivalent of telemetry consolidation and select_servers return change. |
| pymongo/asynchronous/server.py | Async equivalent of routing server close telemetry through _SdamTelemetry. |
| pymongo/asynchronous/pool.py | Async equivalent of routing CMAP telemetry through _CmapTelemetry. |
| pymongo/asynchronous/monitor.py | Async equivalent of routing heartbeat + SRV failure telemetry through _telemetry.py. |
| pymongo/asynchronous/mongo_client.py | Async equivalent of routing retry log entries through log_command_retry. |
| pymongo/_telemetry.py | Centralizes CMAP, heartbeat, SDAM, server-selection telemetry classes and retry/SRV logging helpers. |
| deprioritized_servers: Optional[list[Server]] = None, | ||
| ) -> list[Server]: | ||
| ) -> tuple[list[Server], _ServerSelectionTelemetry]: | ||
| """Return a list of Servers matching selector, or time out. |
| deprioritized_servers: Optional[list[Server]] = None, | ||
| ) -> list[Server]: | ||
| ) -> tuple[list[Server], _ServerSelectionTelemetry]: | ||
| """Return a list of Servers matching selector, or time out. |
| # Cached at construction: this object is short-lived (one heartbeat check) so | ||
| # listener registration and logging level are stable for its lifetime. | ||
| self._should_publish = listeners is not None and listeners.enabled_for_server_heartbeat | ||
| self._should_log = _SDAM_LOGGER.isEnabledFor(logging.DEBUG) | ||
|
|
|
|
||
| def __init__( | ||
| self, | ||
| topology_id: ObjectId, |
| _events = events() if listeners is not None and listeners.enabled_for_server else None # type: ignore[misc] | ||
| self._sdam = _SdamTelemetry(topology_id, listeners, _events) # type: ignore[arg-type] |
| _events = events() if listeners is not None and listeners.enabled_for_server else None # type: ignore[misc] | ||
| self._sdam = _SdamTelemetry(topology_id, listeners, _events) # type: ignore[arg-type] |
| latencies = ", ".join( | ||
| "%s: %sms" % (server.description.address, server.description.round_trip_time) | ||
| for server in (c._get_topology()).select_servers(readable_server_selector, _Op.TEST) | ||
| for server in ((c._get_topology()).select_servers(readable_server_selector, _Op.TEST))[ | ||
| 0 | ||
| ] |
| latencies = ", ".join( | ||
| "%s: %sms" % (server.description.address, server.description.round_trip_time) | ||
| for server in await (await c._get_topology()).select_servers( | ||
| readable_server_selector, _Op.TEST | ||
| ) | ||
| for server in ( | ||
| await (await c._get_topology()).select_servers(readable_server_selector, _Op.TEST) | ||
| )[0] |
| def writable_addresses(topology): | ||
| return { | ||
| server.description.address | ||
| for server in topology.select_servers(writable_server_selector, _Op.TEST) | ||
| for server in (topology.select_servers(writable_server_selector, _Op.TEST))[0] | ||
| } |
| async def writable_addresses(topology): | ||
| return { | ||
| server.description.address | ||
| for server in await topology.select_servers(writable_server_selector, _Op.TEST) | ||
| for server in (await topology.select_servers(writable_server_selector, _Op.TEST))[0] | ||
| } |
PYTHON-5846
Changes in this PR
Consolidates all APM event publishing and structured logging into
pymongo/_telemetry.py, giving OpenTelemetry support a single place to hook into. Adds four new telemetry classes and one helper following the_CommandTelemetrypattern from #2891:_CmapTelemetry— connection pool and connection lifecycle events, owned byPool_HeartbeatTelemetry— server heartbeat events, owned per-check byMonitor_SdamTelemetry— topology and server description change events, owned byTopologyandServer_ServerSelectionTelemetry— server selection log entries (log-only per spec), constructed perselect_servercalllog_command_retry— retry log entries for retryable reads and writesTest Plan
Covered by existing test suites (
test_connection_monitoring.py,test_heartbeat_monitoring.py,test_sdam_monitoring_spec.py,test_connection_logging.py,test_server_selection_logging.py, unified format spec tests). No new public API.Checklist
Checklist for Author
Checklist for Reviewer