33 lines
1.1 KiB
Python
33 lines
1.1 KiB
Python
from typing import Callable, Dict, List
|
|
import threading
|
|
|
|
class EventDispatcher:
|
|
"""Central event dispatcher for registering and firing events."""
|
|
_listeners: Dict[str, List[Callable]] = {}
|
|
_lock = threading.Lock()
|
|
|
|
@classmethod
|
|
def register(cls, event_name: str, func: Callable):
|
|
"""Register a listener for a specific event."""
|
|
with cls._lock:
|
|
cls._listeners.setdefault(event_name, []).append(func)
|
|
|
|
@classmethod
|
|
def dispatch(cls, event_name: str, *args, **kwargs):
|
|
"""Dispatch an event to all registered listeners."""
|
|
with cls._lock:
|
|
listeners = list(cls._listeners.get(event_name, []))
|
|
for listener in listeners:
|
|
try:
|
|
listener(*args, **kwargs)
|
|
except Exception as e:
|
|
# Optionally log the exception
|
|
pass
|
|
|
|
def listen_event(event_name: str):
|
|
"""Decorator to register a function as an event listener."""
|
|
def decorator(func: Callable):
|
|
EventDispatcher.register(event_name, func)
|
|
return func
|
|
return decorator
|