記事一覧へ戻る 本の順番で続きを読む

非同期処理とWebSocket - リアルタイム通信

Python3上級 | 2026/02/18 21:20

非同期処理とWebSocket

Django Channels(WebSocket)

セットアップ

pip install channels channels-redis
# settings.py
INSTALLED_APPS = [..., 'channels']

ASGI_APPLICATION = 'config.asgi.application'
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {'hosts': [('127.0.0.1', 6379)]},
    },
}

Consumer(WebSocketハンドラ)

# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group = f'chat_{self.room_name}'

        # グループに参加
        await self.channel_layer.group_add(
            self.room_group, self.channel_name
        )
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(
            self.room_group, self.channel_name
        )

    async def receive(self, text_data):
        data = json.loads(text_data)
        message = data['message']
        username = self.scope['user'].username

        # グループ全体に送信
        await self.channel_layer.group_send(
            self.room_group,
            {
                'type': 'chat_message',
                'message': message,
                'username': username,
            }
        )

    async def chat_message(self, event):
        await self.send(text_data=json.dumps({
            'message': event['message'],
            'username': event['username'],
        }))

ルーティング

# chat/routing.py
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]

# config/asgi.py
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import chat.routing

application = ProtocolTypeRouter({
    'http': get_asgi_application(),
    'websocket': AuthMiddlewareStack(
        URLRouter(chat.routing.websocket_urlpatterns)
    ),
})

クライアント側(JavaScript)

const ws = new WebSocket(
    'wss://' + window.location.host + '/ws/chat/room1/'
);

ws.onmessage = function(event) {
    const data = JSON.parse(event.data);
    console.log(data.username + ': ' + data.message);
};

ws.onopen = function() {
    ws.send(JSON.stringify({
        'message': 'こんにちは!'
    }));
};

Celery(バックグラウンドタスク)

# config/celery.py
from celery import Celery
import os

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery('config')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# blog/tasks.py
from celery import shared_task

@shared_task
def send_notification_email(user_id, message):
    from django.contrib.auth import get_user_model
    from django.core.mail import send_mail
    User = get_user_model()
    user = User.objects.get(id=user_id)
    send_mail('通知', message, 'noreply@example.com', [user.email])

# ビューから呼び出し
def create_post(request):
    post = Post.objects.create(...)
    send_notification_email.delay(request.user.id, '記事が公開されました')
    return redirect('blog:post_list')

まとめ

  • Django Channels でWebSocketリアルタイム通信を実装
  • Consumer クラスで接続・受信・送信を処理
  • Channel Layer(Redis)でグループブロードキャスト
  • Celery でメール送信や重い処理をバックグラウンド実行
  • .delay() でタスクを非同期キューに投入