from typing import Callable, Dict, List import threading class EventDispatcher: """Central event dispatcher for registering and firing events.""" _listeners: Dict[str, List[Callable]] = {} _lock = threading.Lock() @classmethod def register(cls, event_name: str, func: Callable): """Register a listener for a specific event.""" with cls._lock: cls._listeners.setdefault(event_name, []).append(func) @classmethod def dispatch(cls, event_name: str, *args, **kwargs): """Dispatch an event to all registered listeners.""" with cls._lock: listeners = list(cls._listeners.get(event_name, [])) for listener in listeners: try: listener(*args, **kwargs) except Exception as e: # Optionally log the exception pass def listen_event(event_name: str): """Decorator to register a function as an event listener.""" def decorator(func: Callable): EventDispatcher.register(event_name, func) return func return decorator