RQ’s CronScheduler
is a simple scheduler that allows you to enqueue functions at regular intervals.
New in version 2.4.0.
`CronScheduler` is still in beta, use at your own risk!
CronScheduler
provides a lightweight way to enqueue recurring jobs without the complexity of traditional cron systems. It’s perfect for:
Advantages over traditional cron:
Create a cron configuration file (cron_config.py
):
from rq import cron
from myapp import cleanup_database, send_notifications, send_daily_report
# Run database cleanup every 5 minutes
cron.register(
cleanup_database,
queue_name='repeating_tasks',
interval=300 # 5 minutes in seconds
)
# Send notifications every hour
cron.register(
send_notifications,
queue_name='repeating_tasks',
interval=5 # Every 5 seconds
)
# Send daily reports every 24 hours
cron.register(
send_daily_report,
queue_name='repeating_tasks',
args=('daily',),
kwargs={'format': 'pdf'},
interval=86400 # 24 hours in seconds
)
rq cron cron_config.py
That’s it! Your jobs will now be automatically enqueued at the specified intervals.
Key concepts:
CronScheduler
only enqueues jobs; RQ workers handle executionCronScheduler
is not a job executor - it’s a scheduler to enqueue functions periodically. Here’s how the system works:
CronScheduler
startsCronScheduler
sleeps until the next job is dueConfiguration files use the global register
function to define scheduled jobs:
# my_cron_config.py
from rq.cron import register
from myapp.tasks import cleanup_database, generate_reports, backup_files
# Simple job - runs every 60 seconds
register(cleanup_database, queue_name='maintenance', interval=60)
# Job with arguments
register(
generate_reports,
queue_name='reports',
args=('daily',),
kwargs={'format': 'pdf', 'email': True},
interval=3600
)
# Job with custom timeout and TTL settings
register(
backup_files,
queue_name='backup',
interval=86400, # Once per day
timeout=1800, # 30 minutes max execution time
result_ttl=3600, # Keep results for 1 hour
failure_ttl=86400 # Keep failed jobs for 1 day
)
Since configuration files are standard Python modules, you can include conditional logic. For example:
import os
from rq.cron import register
from myapp.tasks import cleanup
if os.getenv("ENABLE_CLEANUP") == "true":
register(cleanup, queue_name="maintenance", interval=600)
$ rq cron my_cron_config.py
You can specify a dotted module path instead of a file path:
$ rq cron myapp.cron_config
The rq cron
CLI command accepts the following options:
--url
, -u
: Redis connection URL (env: RQ_REDIS_URL)--config
, -c
: Python module with RQ settings (env: RQ_CONFIG)--path
, -P
: Additional Python import paths (can be specified multiple times)--logging-level
, -l
: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL; default: INFO)--worker-class
, -w
: Dotted path to RQ Worker class--job-class
, -j
: Dotted path to RQ Job class--queue-class
: Dotted path to RQ Queue class--connection-class
: Dotted path to Redis client class--serializer
, -S
: Dotted path to serializer classPositional argument:
config_path
: File path or module path to your cron configurationExample:
$ rq cron myapp.cron_config --url redis://localhost:6379/1 --path src
from redis import Redis
from rq.cron import CronScheduler
from myapp.tasks import cleanup_database, send_reports
# Create a cron scheduler
redis_conn = Redis(host='localhost', port=6379, db=0)
cron = CronScheduler(connection=redis_conn, logging_level='INFO')
# Register jobs
cron.register(
cleanup_database,
queue_name='maintenance',
interval=3600
)
cron.register(
send_reports,
queue_name='reports',
args=('daily',),
interval=86400
)
# Start the scheduler (this will block until interrupted)
try:
cron.start()
except KeyboardInterrupt:
print("Shutting down cron scheduler...")