Files

47 lines
1.4 KiB
Python

# plugins/utility/celery.py
from celery import Celery
# 1) Create Celery instance at import time so tasks can import celery_app
# Include your plugin's tasks module and leave room to add more as you go.
celery_app = Celery(
__name__,
include=[
'plugins.utility.tasks',
# 'plugins.backup.tasks',
# 'plugins.cleanup.tasks',
# …add other plugin.task modules here
]
)
def init_celery(app):
"""
Configure the global celery_app with Flask settings and
ensure tasks run inside the Flask application context.
"""
# Pull broker/backend from Flask config
celery_app.conf.broker_url = app.config['CELERY_BROKER_URL']
celery_app.conf.result_backend = app.config.get(
'CELERY_RESULT_BACKEND',
app.config['CELERY_BROKER_URL']
)
celery_app.conf.update(app.config)
# Wrap all tasks in Flask app context
TaskBase = celery_app.Task
class ContextTask(TaskBase):
def __call__(self, *args, **kwargs):
with app.app_context():
return super().__call__(*args, **kwargs)
celery_app.Task = ContextTask
# And auto-discover any other tasks modules you add under the plugins/ namespace
celery_app.autodiscover_tasks([
'plugins.utility',
# 'plugins.backup',
# 'plugins.cleanup',
# …your other plugins here
], force=True)
return celery_app