From 2f70af18cdf11c35496438e0bde7bedaad004a43 Mon Sep 17 00:00:00 2001 From: Michael Graves Date: Mon, 11 May 2026 15:43:22 -0400 Subject: [PATCH 1/5] Added notifier restart method and tests --- can/notifier.py | 45 +++++++++++++++++------- test/notifier_test.py | 82 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 12 deletions(-) diff --git a/can/notifier.py b/can/notifier.py index cb91cf7b4..c28c24ea4 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -158,28 +158,21 @@ def bus(self) -> BusABC | tuple["BusABC", ...]: return tuple(self._bus_list) def add_bus(self, bus: BusABC) -> None: - """Add a bus for notification. + """Add a bus for notification.""" + self._bus_list.append(bus) + self._start_reader(bus) - :param bus: - CAN bus instance. - :raises ValueError: - If the *bus* is already assigned to an active :class:`~can.Notifier`. - """ - # add bus to notifier registry + def _start_reader(self, bus: BusABC) -> None: + """Internal helper to spin up the actual background worker for a bus.""" Notifier._registry.register(bus, self) - # add bus to internal bus list - self._bus_list.append(bus) - file_descriptor: int = -1 try: file_descriptor = bus.fileno() except NotImplementedError: - # Bus doesn't support fileno, we fall back to thread based reader pass if self._loop is not None and file_descriptor >= 0: - # Use bus file descriptor to watch for messages self._loop.add_reader(file_descriptor, self._on_message_available, bus) self._readers.append(file_descriptor) else: @@ -218,6 +211,10 @@ def stop(self, timeout: float = 5.0) -> None: for bus in self._bus_list: Notifier._registry.unregister(bus, self) + self._readers = [] + # Clear any pending asyncio tasks to prevent stale references + self._tasks.clear() + def _rx_thread(self, bus: BusABC) -> None: # determine message handling callable early, not inside while loop if self._loop: @@ -317,6 +314,30 @@ def find_instances(bus: BusABC) -> tuple["Notifier", ...]: """ return Notifier._registry.find_instances(bus) + def restart(self) -> None: + """Restarts the Notifier if it has been stopped. + + :raises RuntimeWarning: If the notifier is already running. + """ + with self._lock: + if not self._stopped: + raise RuntimeWarning("Notifier is already running.") + + self._stopped = False + self.exception = None + # Note: _bus_list is preserved from previous run + + for bus in self._bus_list: + self._start_reader(bus) + + # Re-trigger listeners if they have a start method + for listener in self.listeners: + if hasattr(listener, "start"): + try: + listener.start() + except Exception as e: + logger.error("Failed to restart listener: %s", e) + def __exit__( self, exc_type: type[BaseException] | None, diff --git a/test/notifier_test.py b/test/notifier_test.py index d8512a00b..924a3492c 100644 --- a/test/notifier_test.py +++ b/test/notifier_test.py @@ -72,6 +72,58 @@ def test_registry(self): # find_instance must return the existing instance self.assertEqual(can.Notifier.find_instances(bus), (notifier,)) + def test_restart(self): + with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: + reader = can.BufferedReader() + notifier = can.Notifier(bus, [reader], 0.1) + + bus.send(can.Message(arbitration_id=0x123)) + self.assertIsNotNone(reader.get_message(1)) + + notifier.stop() + self.assertTrue(notifier.stopped) + + new_reader = can.BufferedReader() + notifier.listeners = [new_reader] + + notifier.restart() + self.assertFalse(notifier.stopped) + + # Small settling time for the thread to actually start + time.sleep(0.2) + + # Send and Verify + msg = can.Message(arbitration_id=0xABC) + bus.send(msg) + + recv = new_reader.get_message(1) + self.assertIsNotNone(recv, "Failed to receive message after restart") + self.assertEqual(recv.arbitration_id, 0xABC) + + notifier.stop() + + def test_restart_registry_lifecycle(self): + with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: + notifier = can.Notifier(bus, [], 0.1) + notifier.stop() + + # Verify it is removed from registry + self.assertEqual(can.Notifier.find_instances(bus), ()) + + notifier.restart() + + # Verify it is back in the registry and blocking others + self.assertEqual(can.Notifier.find_instances(bus), (notifier,)) + self.assertRaises(ValueError, can.Notifier, bus, [], 0.1) + + notifier.stop() + + def test_double_restart_warning(self): + with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: + with can.Notifier(bus, [], 0.1) as notifier: + # Attempting to restart an already running notifier should warn + with self.assertRaises(RuntimeWarning): + notifier.restart() class AsyncNotifierTest(unittest.TestCase): def test_asyncio_notifier(self): @@ -88,6 +140,36 @@ async def run_it(): asyncio.run(run_it()) + def test_asyncio_notifier_restart(self): + async def run_it(): + with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: + reader = can.AsyncBufferedReader() + notifier = can.Notifier( + bus, [reader], 0.1, loop=asyncio.get_running_loop() + ) + + notifier.stop() + + # In some cases, creating a new listener is the safest path for a restart + new_reader = can.AsyncBufferedReader() + notifier.listeners = [new_reader] + + notifier.restart() + + # Small yield to let the selector register the FD + await asyncio.sleep(0.1) + + bus.send(can.Message(arbitration_id=0x123)) + + recv_msg = await asyncio.wait_for(new_reader.get_message(), 1.5) + + self.assertIsNotNone(recv_msg) + self.assertEqual(recv_msg.arbitration_id, 0x123) + + notifier.stop() + + asyncio.run(run_it()) + if __name__ == "__main__": unittest.main() From e499966c7e33d5542a26509de36f76ebd5aad0d7 Mon Sep 17 00:00:00 2001 From: Michael Graves Date: Mon, 11 May 2026 15:47:57 -0400 Subject: [PATCH 2/5] Updated documentation for notifier restart --- doc/changelog.d/2055.added.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 doc/changelog.d/2055.added.rst diff --git a/doc/changelog.d/2055.added.rst b/doc/changelog.d/2055.added.rst new file mode 100644 index 000000000..9a23eca1c --- /dev/null +++ b/doc/changelog.d/2055.added.rst @@ -0,0 +1 @@ +Added new Notifier Restart method From f2a3f304d97b7c290ac9adfb9d6a75b5fc34f222 Mon Sep 17 00:00:00 2001 From: Michael Graves Date: Mon, 11 May 2026 15:51:27 -0400 Subject: [PATCH 3/5] Corrected formatting for changelog --- doc/changelog.d/2055.added.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/changelog.d/2055.added.rst b/doc/changelog.d/2055.added.rst index 9a23eca1c..7586aaa26 100644 --- a/doc/changelog.d/2055.added.rst +++ b/doc/changelog.d/2055.added.rst @@ -1 +1 @@ -Added new Notifier Restart method +Added new Notifier Restart method. From b4f3aa119e7d90aab66f3f44a488c2d5bb5bfa80 Mon Sep 17 00:00:00 2001 From: Michael Graves Date: Tue, 12 May 2026 15:56:48 -0400 Subject: [PATCH 4/5] Fixed general linting issues --- can/notifier.py | 15 +++++---------- test/notifier_test.py | 43 ++++++++++++++++++++++--------------------- 2 files changed, 27 insertions(+), 31 deletions(-) diff --git a/can/notifier.py b/can/notifier.py index c28c24ea4..0ecd4ed1a 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -10,11 +10,7 @@ from collections.abc import Awaitable, Callable, Iterable from contextlib import AbstractContextManager from types import TracebackType -from typing import ( - Any, - Final, - NamedTuple, -) +from typing import Any, Final, NamedTuple from can.bus import BusABC from can.listener import Listener @@ -101,7 +97,6 @@ def find_instances(self, bus: BusABC) -> tuple["Notifier", ...]: class Notifier(AbstractContextManager["Notifier"]): - _registry: Final = _NotifierRegistry() def __init__( @@ -316,20 +311,20 @@ def find_instances(bus: BusABC) -> tuple["Notifier", ...]: def restart(self) -> None: """Restarts the Notifier if it has been stopped. - + :raises RuntimeWarning: If the notifier is already running. """ with self._lock: if not self._stopped: raise RuntimeWarning("Notifier is already running.") - + self._stopped = False self.exception = None # Note: _bus_list is preserved from previous run - + for bus in self._bus_list: self._start_reader(bus) - + # Re-trigger listeners if they have a start method for listener in self.listeners: if hasattr(listener, "start"): diff --git a/test/notifier_test.py b/test/notifier_test.py index 924a3492c..24a68c1b5 100644 --- a/test/notifier_test.py +++ b/test/notifier_test.py @@ -76,46 +76,46 @@ def test_restart(self): with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: reader = can.BufferedReader() notifier = can.Notifier(bus, [reader], 0.1) - + bus.send(can.Message(arbitration_id=0x123)) self.assertIsNotNone(reader.get_message(1)) - + notifier.stop() self.assertTrue(notifier.stopped) - + new_reader = can.BufferedReader() notifier.listeners = [new_reader] - + notifier.restart() self.assertFalse(notifier.stopped) - + # Small settling time for the thread to actually start time.sleep(0.2) - + # Send and Verify msg = can.Message(arbitration_id=0xABC) bus.send(msg) - + recv = new_reader.get_message(1) self.assertIsNotNone(recv, "Failed to receive message after restart") self.assertEqual(recv.arbitration_id, 0xABC) - + notifier.stop() def test_restart_registry_lifecycle(self): with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: notifier = can.Notifier(bus, [], 0.1) notifier.stop() - + # Verify it is removed from registry self.assertEqual(can.Notifier.find_instances(bus), ()) - + notifier.restart() - + # Verify it is back in the registry and blocking others self.assertEqual(can.Notifier.find_instances(bus), (notifier,)) self.assertRaises(ValueError, can.Notifier, bus, [], 0.1) - + notifier.stop() def test_double_restart_warning(self): @@ -125,6 +125,7 @@ def test_double_restart_warning(self): with self.assertRaises(RuntimeWarning): notifier.restart() + class AsyncNotifierTest(unittest.TestCase): def test_asyncio_notifier(self): async def run_it(): @@ -147,25 +148,25 @@ async def run_it(): notifier = can.Notifier( bus, [reader], 0.1, loop=asyncio.get_running_loop() ) - + notifier.stop() - + # In some cases, creating a new listener is the safest path for a restart new_reader = can.AsyncBufferedReader() - notifier.listeners = [new_reader] - + notifier.listeners = [new_reader] + notifier.restart() - + # Small yield to let the selector register the FD await asyncio.sleep(0.1) - + bus.send(can.Message(arbitration_id=0x123)) - + recv_msg = await asyncio.wait_for(new_reader.get_message(), 1.5) - + self.assertIsNotNone(recv_msg) self.assertEqual(recv_msg.arbitration_id, 0x123) - + notifier.stop() asyncio.run(run_it()) From e40cadb742ec9f23dcd57390f0f8028a16dfa831 Mon Sep 17 00:00:00 2001 From: Michael Graves Date: Tue, 12 May 2026 16:42:44 -0400 Subject: [PATCH 5/5] Fixing remaining linting errors --- can/notifier.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/can/notifier.py b/can/notifier.py index 0ecd4ed1a..e52cd6857 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -12,6 +12,7 @@ from types import TracebackType from typing import Any, Final, NamedTuple +from can import CanError from can.bus import BusABC from can.listener import Listener from can.message import Message @@ -225,18 +226,20 @@ def _rx_thread(self, bus: BusABC) -> None: if msg := bus.recv(self.timeout): with self._lock: handle_message(msg) - except Exception as exc: # pylint: disable=broad-except + except CanError as exc: self.exception = exc + # Always notify the system when the bus fails if self._loop is not None: self._loop.call_soon_threadsafe(self._on_error, exc) - # Raise anyway - raise - elif not self._on_error(exc): - # If it was not handled, raise the exception here - raise else: - # It was handled, so only log it - logger.debug("suppressed exception: %s", exc) + self._on_error(exc) + logger.error("CAN error in notifier thread: %s", exc) + except Exception as exc: + # Catching other runtime errors to prevent silent thread death + self.exception = exc + logger.critical("Unexpected error in notifier thread: %s", exc) + self._on_error(exc) + raise # Re-raise unexpected non-CAN errors def _on_message_available(self, bus: BusABC) -> None: if msg := bus.recv(0): @@ -330,7 +333,7 @@ def restart(self) -> None: if hasattr(listener, "start"): try: listener.start() - except Exception as e: + except (AttributeError, RuntimeError, TypeError, ValueError) as e: logger.error("Failed to restart listener: %s", e) def __exit__(