Add websocket_max_workers and run async callbacks directly on the loop#3826
Conversation
camdecoster
left a comment
There was a problem hiding this comment.
Are there any protections in place for the case of a server getting hammered and increasing the thread count continuously (when an app fires four sync callbacks right away)?
| pending_get_props: Dict[str, queue.Queue] = {} | ||
| # Track pending get_props requests. Values are queue.Queue (threadpool / | ||
| # sync path) or asyncio.Future (event-loop / async path). | ||
| pending_get_props: Dict[str, Any] = {} |
There was a problem hiding this comment.
Could this type be tightened up a bit?
|
|
||
| # The connection's event loop, used to dispatch async callbacks as | ||
| # tasks and to resolve get_props futures. | ||
| loop = asyncio.get_running_loop() |
There was a problem hiding this comment.
What do you think of enabling debug mode for the loop here (and in a similar place for Quart)? You could use the existing debug config variable to turn this on. This way, users would get warning when a callback is executing slowly. You could also add a way for users to configure loop.slow_callback_duration.
There was a problem hiding this comment.
Good idea, maybe as a follow up.
|
|
||
| # The connection's event loop, used to dispatch async callbacks as | ||
| # tasks and to resolve get_props futures. | ||
| loop = asyncio.get_running_loop() |
There was a problem hiding this comment.
Another option (maybe in tandem) would be to add a watchdog thread outside of the pool that could keep an eye out for bottlenecked callbacks.
| executor = self.create_callback_executor( | ||
| getattr(dash_app, "_websocket_max_workers", 4) | ||
| ) |
There was a problem hiding this comment.
What do you think of moving this inside the try below (along with the assignment to sender_task? If an error occurs before the try, the executor is never shut down.
| pending_callbacks: Dict[str, concurrent.futures.Future] = {} | ||
| # Create a per-connection thread pool executor so that long-lived | ||
| # callbacks on one connection cannot starve worker threads for others. | ||
| # pylint: disable=protected-access |
There was a problem hiding this comment.
Delete these stale pylint comments.
|
camdecoster
left a comment
There was a problem hiding this comment.
This seems good overall. I think the debug addition would be good to add later, but this addresses the underlying issue.
| @@ -209,7 +215,7 @@ def get_callback_executor( | |||
| return self._callback_executor | |||
|
|
|||
| def shutdown_executor(self, wait: bool = True) -> None: | |||
There was a problem hiding this comment.
Is this function needed anymore?
| # Create WebSocket callback instance | ||
| # Async callbacks (incl. persistent ones) run as tasks on | ||
| # the event loop; sync callbacks go to the threadpool. | ||
| # pylint: disable=protected-access |
There was a problem hiding this comment.
I think you can still remove these stale pylint comments.



websocket_max_workersargument, default to 4.