非同期処理とWebSocket
Django Channels(WebSocket)
セットアップ
pip install channels channels-redis
# settings.py
INSTALLED_APPS = [..., 'channels']
ASGI_APPLICATION = 'config.asgi.application'
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {'hosts': [('127.0.0.1', 6379)]},
},
}
Consumer(WebSocketハンドラ)
# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group = f'chat_{self.room_name}'
# グループに参加
await self.channel_layer.group_add(
self.room_group, self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(
self.room_group, self.channel_name
)
async def receive(self, text_data):
data = json.loads(text_data)
message = data['message']
username = self.scope['user'].username
# グループ全体に送信
await self.channel_layer.group_send(
self.room_group,
{
'type': 'chat_message',
'message': message,
'username': username,
}
)
async def chat_message(self, event):
await self.send(text_data=json.dumps({
'message': event['message'],
'username': event['username'],
}))
ルーティング
# chat/routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]
# config/asgi.py
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import chat.routing
application = ProtocolTypeRouter({
'http': get_asgi_application(),
'websocket': AuthMiddlewareStack(
URLRouter(chat.routing.websocket_urlpatterns)
),
})
クライアント側(JavaScript)
const ws = new WebSocket(
'wss://' + window.location.host + '/ws/chat/room1/'
);
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log(data.username + ': ' + data.message);
};
ws.onopen = function() {
ws.send(JSON.stringify({
'message': 'こんにちは!'
}));
};
Celery(バックグラウンドタスク)
# config/celery.py
from celery import Celery
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery('config')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# blog/tasks.py
from celery import shared_task
@shared_task
def send_notification_email(user_id, message):
from django.contrib.auth import get_user_model
from django.core.mail import send_mail
User = get_user_model()
user = User.objects.get(id=user_id)
send_mail('通知', message, 'noreply@example.com', [user.email])
# ビューから呼び出し
def create_post(request):
post = Post.objects.create(...)
send_notification_email.delay(request.user.id, '記事が公開されました')
return redirect('blog:post_list')
まとめ
- Django Channels でWebSocketリアルタイム通信を実装
- Consumer クラスで接続・受信・送信を処理
- Channel Layer(Redis)でグループブロードキャスト
- Celery でメール送信や重い処理をバックグラウンド実行
.delay() でタスクを非同期キューに投入