Files
natureinpots_community/app/hooks.py

33 lines
1.1 KiB
Python

from typing import Callable, Dict, List
import threading
class EventDispatcher:
"""Central event dispatcher for registering and firing events."""
_listeners: Dict[str, List[Callable]] = {}
_lock = threading.Lock()
@classmethod
def register(cls, event_name: str, func: Callable):
"""Register a listener for a specific event."""
with cls._lock:
cls._listeners.setdefault(event_name, []).append(func)
@classmethod
def dispatch(cls, event_name: str, *args, **kwargs):
"""Dispatch an event to all registered listeners."""
with cls._lock:
listeners = list(cls._listeners.get(event_name, []))
for listener in listeners:
try:
listener(*args, **kwargs)
except Exception as e:
# Optionally log the exception
pass
def listen_event(event_name: str):
"""Decorator to register a function as an event listener."""
def decorator(func: Callable):
EventDispatcher.register(event_name, func)
return func
return decorator