一.安装
aptitude install rabbitmq-server rabbitmqctl add_user mike cheese rabbitmqctl add_vhost /django_tutorials rabbitmqctl set_permissions -p /django_tutorials mike ".*" ".*" ".*" /etc/init.d/rabbitmq-server stop /ect/init.d/rabbitmq-server start pip install celery django-celery
二.新建app
manage.py startapp celery_test
三.修改settings.py
二.修改celery_test/models.py
from django.db import models class CounterModel(models.Model): count = models.IntegerField(default=0) def __unicode__(self): return unicode(self.count)
三.修改celery_test/tasks.py
from celery.task import task from celery import current_task from celery_test.models import CounterModel from time import sleep @task() def add_two_numbers(a, b): """This is where you do the bulk of the your queued task needs to do""" sleep(10) count = CounterModel.objects.create(count = a + b) return count @task() def do_something_long(): for i in range(100): sleep(0.2) current_task.update_state(state="PROGRESS", meta={'current':i, 'total':100})
四.运行
manage.py schemamigration celery_test --initial manage.py migrate manage.py celeryd -l info manage.py shell
五.修改django_test/views.py
from celery.result import AsyncResult from celery_test.tasks import do_something_long from django.core.urlresolvers import reverse from django.utils import simplejson as json def start_celery_task(request): task = do_something_long.delay() return HttpResponseRedirect( "%s%s" % ('/celery_progress?task_id=', task.id) ) def monitor_celery_task(request): if 'task_id' in request.GET: task_id = request.GET['task_id'] else: return HttpResponse('No task_id passed.') task = AsyncResult(task_id) data = task.result or task.state return HttpResponse(json.dumps(data), mimetype='application/json')
六.修改django_test/urls.py
url(r'^celery_test/', 'django_test.views.start_celery_task'), url(r'^celery_progress/', 'django_test.views.monitor_celery_task'),