CELERY

Download & install reddis

go to

Install django,celery redis

pip install django celery redis

Create a Django Project:

Run the following command to create a new Django project:

django-admin startproject project_name

Navigate to the Project Directory:

Move into the project directory:

cd project_name

Create a Django App:

Create a new app within the project:

python manage.py startapp app_name

Create a URL Mapping

app_name/urls.py

# app_name/urls.py

from django.urls import path
from . import views

urlpatterns = [
    path('', views.index, name='index'),
]

app_name/views.py

# app_name/views.py

from django.http import HttpResponse

def my_view(request):
    return HttpResponse("Hello from my view!")

project settings.py

INSTALLED_APPS = [
    'app_name'
]

project urls.py

# project_name/urls.py

from django.contrib import admin
from django.urls import path, include  # Import the include function

urlpatterns = [
    path('admin/', admin.site.urls),
    path('', include('app_name.urls')),  # Include the app's URLs
]

Start the Development Server To Check

python manage.py runserver

প্রজেক্ট ডিরেক্টরির যেখানে settings.py ফাইল আছে সেখানে celery.py নামে একটি ফাইল তৈরী করি।

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project_name.settings')

app = Celery('project_name',broker='redis://localhost:6379/0')  # Use the same name as your Django project

# This reads the configuration from the Django settings file.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

app ফোল্ডারে tasks.py নামে একটি ফাইল তৈরী করি

# app_name/tasks.py

from celery import shared_task

@shared_task
def my_task(parameter):
    # Your task logic here
    return "Task completed"

প্রজেক্ট ফোল্ডারে cmd ওপেন করে নিচের কমান্ড রান করি

celery -A project_name worker -l info

app ফোল্ডারে views.py নামে ফাইলটি নিচের মত করি

from django.shortcuts import render
from django.http import HttpResponse  # Import HttpResponse

from .tasks import my_task

def index(request):
    # ...
    result = my_task.delay(5)

    return HttpResponse("Task triggered successfully")  # Return an HttpResponse

এবার চেক করি celery টাস্কটি কাজ করছে কিনা।

Last updated