A job is a Python object, representing a function that is invoked asynchronously in a worker (background) process. Any Python function can be invoked asynchronously, by simply pushing a reference to the function and its arguments onto a queue. This is called enqueueing.
To put jobs on queues, first declare a function:
import requests
def count_words_at_url(url):
resp = requests.get(url)
return len(resp.text.split())
Noticed anything? There's nothing special about this function! Any Python function call can be put on an RQ queue.
To put this potentially expensive word count for a given URL in the background, simply do this:
from rq import Connection, Queue
from redis import Redis
from somewhere import count_words_at_url
# Tell RQ what Redis connection to use
redis_conn = Redis()
q = Queue(connection=redis_conn) # no args implies the default queue
# Delay calculation of the multiplication
job = q.enqueue(count_words_at_url, 'http://nvie.com')
print job.result # => None
# Now, wait a while, until the worker is finished
time.sleep(2)
print job.result # => 889
If you want to put the work on a specific queue, simply specify its name:
q = Queue('low', connection=redis_conn)
q.enqueue(count_words_at_url, 'http://nvie.com')
Notice the Queue('low') in the example above? You can use any queue name, so
you can quite flexibly distribute work to your own desire. A common naming
pattern is to name your queues after priorities (e.g. high, medium,
low).
For cases where you want to pass in options to .enqueue() itself (rather than
to the job function), use .enqueue_call(), which is the more explicit
counterpart. A typical use case for this is to pass in a timeout argument:
q = Queue('low', connection=redis_conn)
q.enqueue_call(func=count_words_at_url,
args=('http://nvie.com',),
timeout=30)
For cases where the web process doesn't have access to the source code running in the worker (i.e. code base X invokes a delayed function from code base Y), you can pass the function as a string reference, too.
q = Queue('low', connection=redis_conn)
q.enqueue('my_package.my_module.my_func', 3, 4)
You can also enqueue instance methods. Given a trivial class:
import requests
class URLWordCounter(object):
def __init__(self, url):
self.url = url
def count_words(self):
return len(requests.get(self.url).text.split())
It's easy to put it to use with RQ:
from rq import Connection, Queue
from redis import Redis
from somewhere_else import URLWordCounter
redis_conn = Redis()
q = Queue(connection=redis_conn)
wc = URLWordCounter('http://nvie.com')
q.enqueue(wc.count_words)
With RQ, you don't have to set up any queues upfront, and you don't have to specify any channels, exchanges, routing rules, or whatnot. You can just put jobs onto any queue you want. As soon as you enqueue a job to a queue that does not exist yet, it is created on the fly.
RQ does not use an advanced broker to do the message routing for you. You may consider this an awesome advantage or a handicap, depending on the problem you're solving.
Lastly, it does not speak a portable protocol, since it depends on pickle to serialize the jobs, so it's a Python-only system.
When jobs get enqueued, the queue.enqueue() method returns a Job instance.
This is nothing more than a proxy object that can be used to check the outcome
of the actual job.
For this purpose, it has a convenience result accessor property, that
will return None when the job is not yet finished, or a non-None value when
the job has finished (assuming the job has a return value in the first place,
of course).
@job decoratorIf you're familiar with Celery, you might be used to its @task decorator.
Starting from RQ >= 0.3, there exists a similar decorator:
from rq.decorators import job
@job('low', conn=my_redis_conn, timeout=5)
def add(x, y):
return x + y
job = add.delay(3, 4)
time.sleep(1)
print job.result
For testing purposes, you can enqueue jobs without delegating the actual
execution to a worker (available since version 0.3.1). To do this, pass the
async=False argument into the Queue constructor:
>>> q = Queue('low', async=False)
>>> job = q.enqueue(fib, 8)
>>> job.result
21
The above code runs without an active worker and executes fib(8)
synchronously within the same process. You may know this behaviour from Celery
as ALWAYS_EAGER.
To learn about workers, see the workers documentation.
Technically, you can put any Python function call on a queue, but that does not mean it's always wise to do so. Some things to consider before putting a job on a queue:
__module__ is importable by the worker. In
particular, this means that you cannot enqueue functions that are declared in
the __main__ module.RQ workers will only run on systems that implement fork(). Most notably,
this means it is not possible to run the workers on Windows.