episyche logo

Episyche

Django/Deployments/

How to run asynchronous tasks in Django DRF using celery?

Published on

How to run asynchronous tasks in Django DRF using celery?
A few times, we may have to run resource-intensive tasks in python, it is always recommended to run such tasks asynchronously. In this blog, we are going explain the steps on how to run asynchronous tasks in the Django DRF using celery.

Introduction:

In Django, when you run a time-intensive and resource-intensive task, you may feel a performance delay, because python is single-threaded and it runs the code in sequence. Sometimes the API can return a timeout error. We can run the time-intensive tasks asynchronously to overcome the delay and timeout issues. To run the asynchronous tasks we are going to use python celery in Django.

Where the celery can be used?

Celery (asynchronous tasks) can be used for various purposes, some of the common examples are listed below.

  • Sending Bulk Emails

  • Image processing

  • Video processing

  • Data Analytics

 

You can find the full source code for the Celery with Django example project at the github repo

In this tutorial, we are going to integrate celery with an existing Django application for running video processing tasks asynchronously.

My Django project is located in the following path

/home/ubuntu/django_celery_asynchronous_tasks.

So in this tutorial, I’ll be referring to the following paths and directories.

django_project_directory : /home/ubuntu/django_celery_asynchronous_tasks/

django_core_app_directory : /home/ubuntu/django_celery_asynchronous_tasks/core/

Kindly replace the above paths your with project directory paths.

Flow Diagram:


django-asynchronoues-flow-diagram

Prerequisites:

  • Python 3.8

  • Django 4.0.2 or above

  • Postman Application

Steps:

Step 1: Celery Installation

  • If you want to create a Django project from scratch, please read the following article.

  • If you like to run periodic tasks on the existing Django project, then navigate to the Django project directory.

1cd <django_project_directory> 2 3For example: 4/home/ubuntu/celery_asynchronous_tasks/
  • Install the celery package using the pip package manager.

1python3 -m pip install -U celery[redis]
  • Verify the installed celery version by executing the following command.

1celery --version

It’ll prompt the version details as shown in the following screenshot.


django-celery-version


  • Install Redis using the apt package manager.

1sudo apt install redis -y
  • Start redis-server using the following command.

1sudo systemctl start redis-server.service
  • Enable redis-server. This command helps to start the redis server automatically as soon as the server is rebooted.

1sudo systemctl enable redis-server.service
  • Check the status of the redis server, by running the below command.

1redis-cli ping
  • If redis is running successfully, it will return a PONG message.

A sample screenshot for the redis-cli ping command is shown below.



redis-server-cli-response

Step 2: Celery Integration with Django

  • Now it's time to integrate celery with our Django project.

  • Navigate to the Django core app directory.

1cd <django_project_directory>/<core_app> 2 3# in my case 4/home/ubuntu/celery_asynchronous_tasks/core/
  • Create a file named celery.py with the following snippet.

1 2from __future__ import absolute_import, unicode_literals 3import os 4 5from celery import Celery 6 7# Set the default Django settings module for the 'celery' program. 8# "sample_app" is name of the root app 9os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings') 10 11app = Celery( 'celery_app', 12 broker='redis://localhost:6379/0', 13 backend='redis://localhost:6379/0' 14 ) 15 16# Load task modules from all registered Django apps. 17app.autodiscover_tasks() 18
  • In the above code, we just created the Celery instance called app, and to use Celery within our project we simply import this celery instance.

A sample celery.py file can be found in the following git repository file.

  • Add the below code in the __init.py__ file present in the current directory.

1from __future__ import absolute_import, unicode_literals 2from .celery import app as celery_app 3 4__all__ = ('celery_app',) 5

A sample __init__.py file can be found in the following git repository file.

  • Now navigate to the project root directory.

1cd ../
  • And run the following command to test the above configurations in the Django app directory.

1celery -A core worker -l info

core - Name of your celery app mentioned in the celery.py file.

An example execution screenshot is given below for your reference.


django-celery-worker-check

Step 3: Creating tasks

Creating an app

  • Now we are going to create the tasks which are going to be run asynchronously.

  • Tasks can be created within any app of the project under the filename tasks.py (so that celery can auto-discover them)

  • Navigate to the Django project directory and create a Django app named multimedia.

1cd <django project directory> 2django-admin startapp multimedia
  • Add the app name "multimedia" to the settings.py file as shown in the following example screenshot.

Installing moviepy

  • We are now going to create an API with the functionality of downloading a video from the given URL and creating a subclip from that video.

  • We need to install a video processing library named moviepy, to do that run the following command in the terminal.

1python3 -m pip install moviepy
  • Create a file named tasks.py and add the following snippet.

1from moviepy.editor import VideoFileClip 2 3def clip_video(video_file_path, request_data, clip_path): 4 video = VideoFileClip(video_file_path) 5 clip = video.subclip(request_data['start_time'],request_data['end_time']) 6 clip.write_videofile(clip_path)

An example of tasks.py can be found in the following git repo.

  • Now open the views.py file and add the following code. It defines a POST Method API that will get an URL, the time duration to clip a video, and the new video file name.

1from rest_framework.response import Response 2from rest_framework.views import APIView 3from .tasks import clip_video 4 5 6class Clip_Video_API(APIView): 7 8 def post(self, request): 9 10 try: 11 data = request.data 12 media_dir = "/home/ubuntu/media/" 13 video_file_path = media_dir + data['video_file'] 14 clip_path = media_dir + data['clip_name'] 15 clip_video(video_file_path, data, clip_path) 16 data, st, msg = clip_path, 200, "Video clipped successfully" 17 except Exception as error: 18 print(error) 19 data, st, msg = "", 500, "Error while clipping the video" 20 resp_json = {"data":data, "st":st, "msg":msg} 21 return Response(resp_json)

An example of views.py can be found in the following git repo.

  • Open the urls.py in the current directory (multimedia app) and add the following code.

1from django.urls import path 2from .views import Clip_Video_API 3 4urlpatterns = [ 5 path("clip_video/", Clip_Video_API.as_view()) 6]
  • Open the urls.py file in the Django core app directory and append path('', include('multimedia.urls')), as in the screenshot following.

  • Now run the Django Application in the terminal by running the following command.

1cd <django_project_directory> 2python3 manage.py runserver
  • Open the Postman Application and enter the request URL and choose POST Method as the request method, in the body choose JSON and add the following data into the body.

1{ 2 "video_file" : "example_video.mp4", 3 "clip_name" : "Edited_file.mp4", 4 "start_time" : 15, 5 "end_time" : 65 6}
  • An example screenshot has been shown below for your reference.



postman-testing-async-test-check-without-async

  • Now hit the Send button to send the request to the locally running Django Application.

In my case, My video file is having the duration of 8 minutes, and I’m going to clip a video of one minute.(from 15th second to 65th second)

  • Wait for the request to return the response, and you’ll see the response duration as in the following example screenshot.



postman-testing-async-test

In my case, the response duration is 1 minute and 14 seconds.

Refactor the function as a celery task

  • Now we are going to transform the function clip_path() in tasks.py as a celery task by importing the decorator @shared_task and defining the function clip_path() using the imported decorator.

  • Open the tasks.py file and change to code as shown in the below snippet.

1from moviepy.editor import VideoFileClip 2from celery import shared_task 3 4@shared_task 5def clip_video(video_file_path, request_data, clip_path): 6 video = VideoFileClip(video_file_path) 7 clip = video.subclip(request_data['start_time'],request_data['end_time']) 8 clip.write_videofile(clip_path)

An example tasks.py file can be found in the following git repo.

  • Open the views.py file and change the line where we call the clilp_path as in the following code.

1clip_video(video_file_path, data, clip_path) 2# to 3clip_video.delay(video_file_path, data, clip_path)

Running worker

  • Open a new terminal and navigate to the Django project directory.

1cd <django project directory> 2in my case 3cd /home/ubuntu/celery_asynchronous_tasks/
  • Start the celery worker by running the following command in the terminal.

1celery -A core worker -l info
  • Now you can see the celery worker picking up the defined task clip_video as shown in the below reference screenshot.



django-celery-worker-example

Calling tasks

  • Now open the Postman Application and send the request to the clip_video API.

  • In the meantime, you can see the task multimedia.tasks.clip_video is invoked and processing the video cropping as shown in the below reference screenshot.



django-celery-worker-example

  • Now open the Postman Application and check the request duration. you can see that the API called the video processing function asynchronously and returned the result.


postman-testing-async-test-example

 

Step 7: Saving Celery results

  • We can save the result of the celery to our database for various purposes. To do that we need a library named django-celery-results.

  • Install the Django celery results by running the command.

1python3 -m pip install django-celery-results
  • Add django_celery_results to the INSTALLED_APPS in our project’s settings.py.

1INSTALLED_APPS = [ 2 ... 3 'multimedia', 4 'django_celery_results', 5]
  • Run the migrations command to create the Celery database.

1python manage.py migrate django_celery_results
  • Now that you can see the tables for django_celery_results have been created.

An example screenshot is given below for your reference.


django-celery-database


  • Appending the following line to the settings.py file.

CELERY_RESULT_BACKEND = 'django-db'

Step 8: Run Celery as a daemon

  • In order to start the celery automatically after the server reboot, we are going to configure the celery under the supervisor service.

  • On Ubuntu, we can install supervisor as a Debian package, to do so run the following command.

1sudo apt-get install supervisor -y
  • Navigate to the supervisor configuration directory and create a file named celery.conf.

1cd /etc/supervisor/conf.d/ 2touch celery.conf 3
  • Add the following line to the celery.conf file.

1; ========================================== 2; celery worker config 3; ========================================== 4 5[program: worker] 6command=/home/ubuntu/.django_env/bin/celery -A sample_app worker -l info 7directory=/home/ubuntu/celery_asynchronous_tasks/ 8user=ubuntu 9numprocs=1 10stdout_logfile=/var/log/celery/worker.log 11stderr_logfile=/var/log/celery/worker.err.log 12autostart=true 13autorestart=true 14startsecs=10 15stopwaitsecs = 600 16killasgroup=true 17priority=998 18; priority 998 executes first and then 999

Note:*

[program:worker]: This is the name of the supervisor’s daemon process (name of your convenience)

command : The command used to run the processor with the path for celery.

To know the path to celery, enter the command which celery and add the output before the celery command.

directory: The directory in which the project runs. Here is the path of the Django project.

user: Username of the server or computer. Usually ubuntu here.

stdout_logfile : Logfile location. (any)

stderr_logfile : Error logfile locations.(any)

A sample configuration file celery.conf can be found in the following git repository file.

  • To fetch the latest celery configuration changes added to the supervisor, please execute the below command.

1sudo supervisorctl reread
  • To make the supervisor update the celery configuration, run the following command.

1sudo supervisorctl update
  • To view the supervisor processes status run the following command.

1sudo supervisorctl 2#(or) 3sudo supervisorctl status all
  • If the supervisor process is already started, please run the following command to start the process.

1sudo supervisorctl start all


djagno-celery-supervisor-daemon-service


Helpful Commands for your reference:

sudo supervisorctl start process_name

sudo supervisorctl stop process_name

sudo supervisorctl restart process_name

Comments