# plugins/utility/celery.py from celery import Celery # 1) Create Celery instance at import time so tasks can import celery_app # Include your plugin's tasks module and leave room to add more as you go. celery_app = Celery( __name__, include=[ 'plugins.utility.tasks', # 'plugins.backup.tasks', # 'plugins.cleanup.tasks', # …add other plugin.task modules here ] ) def init_celery(app): """ Configure the global celery_app with Flask settings and ensure tasks run inside the Flask application context. """ # Pull broker/backend from Flask config celery_app.conf.broker_url = app.config['CELERY_BROKER_URL'] celery_app.conf.result_backend = app.config.get( 'CELERY_RESULT_BACKEND', app.config['CELERY_BROKER_URL'] ) celery_app.conf.update(app.config) # Wrap all tasks in Flask app context TaskBase = celery_app.Task class ContextTask(TaskBase): def __call__(self, *args, **kwargs): with app.app_context(): return super().__call__(*args, **kwargs) celery_app.Task = ContextTask # And auto-discover any other tasks modules you add under the plugins/ namespace celery_app.autodiscover_tasks([ 'plugins.utility', # 'plugins.backup', # 'plugins.cleanup', # …your other plugins here ], force=True) return celery_app