Each RQ object (queues, workers, jobs) has a connection keyword
argument that can be passed to the constructor - this is the recommended way of handling connections.
from redis import Redis
from rq import Queue
redis = Redis('localhost', 6789)
q = Queue(connection=redis)
This pattern allows for different connections to be passed to different objects:
from rq import Queue
from redis import Redis
conn1 = Redis('localhost', 6379)
conn2 = Redis('remote.host.org', 9836)
q1 = Queue('foo', connection=conn1)
q2 = Queue('bar', connection=conn2)
Every job that is enqueued on a queue will know what connection it belongs to. The same goes for the workers.
Note:
The use of Connection context manager is deprecated.
Please don't use Connection in your scripts.
Instead, use explicit connection management.
There is a better approach if you want to use multiple connections, though. Each RQ object instance, upon creation, will use the topmost Redis connection on the RQ connection stack, which is a mechanism to temporarily replace the default connection to be used.
An example will help to understand it:
from rq import Queue, Connection
from redis import Redis
with Connection(Redis('localhost', 6379)):
q1 = Queue('foo')
with Connection(Redis('remote.host.org', 9836)):
q2 = Queue('bar')
q3 = Queue('qux')
assert q1.connection != q2.connection
assert q2.connection != q3.connection
assert q1.connection == q3.connection
You can think of this as if, within the Connection context, every newly
created RQ object instance will have the connection argument set implicitly.
Enqueueing a job with q2 will enqueue it in the second (remote) Redis
backend, even when outside of the connection context.
If your code does not allow you to use a with statement, for example, if you
want to use this to set up a unit test, you can use the push_connection() and
pop_connection() methods instead of using the context manager.
import unittest
from rq import Queue
from rq import push_connection, pop_connection
class MyTest(unittest.TestCase):
def setUp(self):
push_connection(Redis())
def tearDown(self):
pop_connection()
def test_foo(self):
"""Any queues created here use local Redis."""
q = Queue()
To use redis sentinel, you must specify a dictionary in the configuration file. Using this setting in conjunction with the systemd or docker containers with the automatic restart option allows workers and RQ to have a fault-tolerant connection to the redis.
SENTINEL: {
'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],
'MASTER_NAME': 'master',
'DB': 2,
'USERNAME': 'redis-user',
'PASSWORD': 'redis-secret',
'SOCKET_TIMEOUT': None,
'CONNECTION_KWARGS': { # Eventual addition Redis connection arguments
'ssl_ca_path': None,
},
'SENTINEL_KWARGS': { # Eventual Sentinels connections arguments
'username': 'sentinel-user',
'password': 'sentinel-secret',
},
}
To avoid potential issues with hanging Redis commands, specifically the blocking BLPOP command,
RQ automatically sets a socket_timeout value that is 10 seconds higher than the dequeue_timeout. The dequeue_timeout is computed as 15 seconds shorter than the worker_ttl value.
Here are the following computed timeout values if you were not to adjust anything.
| Setting | Default Value |
|---|---|
worker_ttl |
420 |
connection_timeout |
415 |
dequeue_timeout |
405 |
If you prefer to manually set the socket_timeout value,
make sure that the value being set is higher than the dequeue_timeout (which is 405 by default).
from redis import Redis
from rq import Queue
conn = Redis('localhost', 6379, socket_timeout=500)
q = Queue(connection=conn)
Setting a socket_timeout with a lower value than the dequeue_timeout will cause a TimeoutError
since it will interrupt the worker while it gets new jobs from the queue.
The encoding and decoding of Redis objects occur in multiple locations within the codebase,
which means that the decode_responses=True argument of the Redis connection is not currently supported.
from redis import Redis
from rq import Queue
conn = Redis(..., decode_responses=True) # This is not supported
q = Queue(connection=conn)