• Django WebSocket
  • Django channels

    Django WebSocket

    websocket是一种协议,2008年诞生,2011年成为国际标准。它的特点是服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话。它是建立在 TCP 协议之上的协议,即握手阶段仍然是http协议。

    这里讲的是Django中的channels, channels运行于ASGI协议上,ASGI的全名是Asynchronous Server Gateway Interface。它是区别于Django使用的WSGI协议 的一种异步服务网关接口协议,正是因为它才实现了websocket

    在django中请求处理逻辑是:url ---> view, 在channels中也是一样,只是叫的名字不同罢了。它的处理逻辑是:routing ---> consumer. 所以在django中完成前后端的双向通信,需要用url--->view这套逻辑来渲染页面,而用routing--->consumer这套逻辑来进行双向的信息发送。

    使用的基本流程

    配置

    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'channels',
    ]
    
    #  指定路由位置
    ASGI_APPLICATION = "django_channels_demo.routing.application"
    

    路由

    from channels.routing import ProtocolTypeRouter, URLRouter
    from django.conf.urls import url
    from demo import consumers
    
    
    application = ProtocolTypeRouter({
        'websocket': URLRouter([
            url(r'^chat/$', consumers.ChatConsumer),
        ])
    })
    

    处理请求

    from channels.generic.websocket import WebsocketConsumer
    from channels.exceptions import StopConsumer
    
    class ChatConsumer(WebsocketConsumer):
    
        def websocket_connect(self, message):
            self.accept()
    
        def websocket_receive(self, message):
            print('接收到消息', message)
            self.send(text_data='收到了')
    
        def websocket_disconnect(self, message):
            print('客户端断开连接了')
            raise StopConsumer()
    

    路由

    分析channels源码,我们会发现它支持多种路由方式:

    ProtocolTypeRouter # 根据协议类型映射(分发) URLRouter # 根据url路径路由,可以使用django.conf.url来解析路径,支持url()或者path() ChannelNameRouter # 根据channel的名字路由,channel可以理解 为广播的频道

    application = ProtocolTypeRouter({
        'websocket': URLRouter([
            url(r'^chat/$', consumers.ChatConsumer),
        ])
    })
    

    可以看到这段代码是先将 websocket协议分配给后面的URLRouter来处理了,而我们的URLRouter则是基于url路径的路由。最终达到websocket协议的 url路径的路由。

    而上面的路由又将chat的url全部交给了ChatConsumer这个类来处理了。

    视图

    这里说的视图,即是django的视图,因为它们的功能一样,所以我便叫它视图:

    我们看源码的websocket模块,发现只有四个类:

    WebsocketConsumer JsonWebsocketConsumer AsyncWebsocketConsumer AsyncJsonWebsocketConsumer

    这么一看,实际只有两个类,因为后面两个是异步版本。

    WebsocketConsumer

    class WebsocketConsumer(SyncConsumer):
        """
        Base WebSocket consumer. Provides a general encapsulation for the
        WebSocket handling model that other applications can build on.
        """
    
        groups = None
    
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            if self.groups is None:
                self.groups = []
    
        def websocket_connect(self, message):
            """
            Called when a WebSocket connection is opened.
            """
            try:
                for group in self.groups:
                    async_to_sync(self.channel_layer.group_add)(group, self.channel_name)
            except AttributeError:
                raise InvalidChannelLayerError(
                    "BACKEND is unconfigured or doesn't support groups"
                )
            try:
                self.connect()
            except AcceptConnection:
                self.accept()
            except DenyConnection:
                self.close()
    
        def connect(self):
            self.accept()
    
        def accept(self, subprotocol=None):
            """
            Accepts an incoming socket
            """
            super().send({"type": "websocket.accept", "subprotocol": subprotocol})
    
        def websocket_receive(self, message):
            """
            Called when a WebSocket frame is received. Decodes it and passes it
            to receive().
            """
            if "text" in message:
                self.receive(text_data=message["text"])
            else:
                self.receive(bytes_data=message["bytes"])
    
        def receive(self, text_data=None, bytes_data=None):
            """
            Called with a decoded WebSocket frame.
            """
            pass
    
        def send(self, text_data=None, bytes_data=None, close=False):
            """
            Sends a reply back down the WebSocket
            """
            if text_data is not None:
                super().send({"type": "websocket.send", "text": text_data})
            elif bytes_data is not None:
                super().send({"type": "websocket.send", "bytes": bytes_data})
            else:
                raise ValueError("You must pass one of bytes_data or text_data")
            if close:
                self.close(close)
    
        def close(self, code=None):
            """
            Closes the WebSocket from the server end
            """
            if code is not None and code is not True:
                super().send({"type": "websocket.close", "code": code})
            else:
                super().send({"type": "websocket.close"})
    
        def websocket_disconnect(self, message):
            """
            Called when a WebSocket connection is closed. Base level so you don't
            need to call super() all the time.
            """
            try:
                for group in self.groups:
                    async_to_sync(self.channel_layer.group_discard)(
                        group, self.channel_name
                    )
            except AttributeError:
                raise InvalidChannelLayerError(
                    "BACKEND is unconfigured or doesn't support groups"
                )
            self.disconnect(message["code"])
            raise StopConsumer()
    
        def disconnect(self, code):
            """
            Called when a WebSocket connection is closed.
            """
            pass
    

    这里定义的方法可以分成四类来看:连接, 发送消息,接收消息,断开连接。

    当建立连接时,需要定制点功能我们就可以重写:websocket_connect 同样的道理,我们可以重写:websocket_receive, websocket_disconnect,而在这些函数内部我们可以调用send方法,比如上面的当我们收到消息时,给发送者回消息,就可以调用send.显然 我们可以看到这里可以发送两种类型的数据: bytes, text即二进制和文本数据。

    JsonWebsocketConsumer

    而另一个类JsonWebsocketConsumer是WebsocketConsumer的子类,所以很好理解它改送json格式的数据,只不过对文本数据转化为json格式而已。没什么特别,下面是源码:

    class JsonWebsocketConsumer(WebsocketConsumer):
        """
        Variant of WebsocketConsumer that automatically JSON-encodes and decodes
        messages as they come in and go out. Expects everything to be text; will
        error on binary data.
        """
    
        def receive(self, text_data=None, bytes_data=None, **kwargs):
            if text_data:
                self.receive_json(self.decode_json(text_data), **kwargs)
            else:
                raise ValueError("No text section for incoming WebSocket frame!")
    
        def receive_json(self, content, **kwargs):
            """
            Called with decoded JSON content.
            """
            pass
    
        def send_json(self, content, close=False):
            """
            Encode the given content as JSON and send it to the client.
            """
            super().send(text_data=self.encode_json(content), close=close)
    
        @classmethod
        def decode_json(cls, text_data):
            return json.loads(text_data)
    
        @classmethod
        def encode_json(cls, content):
            return json.dumps(content)
    

    web端:

    据说大多数浏览器都支持websocket,WebSocket 对象作为一个构造函数,用于新建 WebSocket 实例

    ref: https://developer.mozilla.org/en-US/docs/Web/API/WebSocket

    var ws = new WebSocket('ws://localhost:8000');
    

    readyState属性

    webSocket.readyState readyState 属性返回实例对象的当前状态,共有四种

    CONNECTING:值为0,表示正在连接。 OPEN:值为1,表示连接成功,可以通信了。 CLOSING:值为2,表示连接正在关闭。 CLOSED:值为3,表示连接已经关闭,或者打开连接失败。

    回调函数

    主要是三个函数:

    webSocket.onopen # 实例对象的onopen属性,用于指定连接成功后的回调函数 webSocket.onclose # 实例对象的onclose属性,用于指定连接关闭后的回调函数 webSocket.onmessage # 实例对象的onmessage属性,用于指定收到服务器数据后的回调函数 webSocket.onerror # 实例对象的onerror属性,用于指定报错时的回调函数 webSocket.send # 用于向服务器发送数据

    onopen

    ws.onopen = function () {
      ws.send('Hello Server!');
    }
    
    // 也可以这样,同样的道理,其它几个方法也可以通过这种方式来达到各自的目的,或者指定多个回调函数
    ws.addEventListener("onopen", function(event) {
      // do sth
    });
    

    onclose

    ws.onclose = function(event) {
      // do sth
    };
    

    onmessage

    ws.onmessage = function(event) {
       var data = event.data;
      // do sth
      // 处理数据
    };
    

    send

    ws.send('your message');
    
    // 文件(二进制)
    var file = document.querySelector('input[type="file"]').files[0];
    ws.send(file);
    

    onerror

    socket.onerror = function(event) {
      // handle error event
    };
    

    bufferedAmount

    实例对象的bufferedAmount属性,表示还有多少字节的二进制数据没有发送出去。它可以用来判断发送是否结束。

    var data = new ArrayBuffer(10000000);
    socket.send(data);
    
    if (socket.bufferedAmount === 0) {
      // 发送完毕
    } else {
      // 发送还没结束
    }
    

    channels-layer

    channel类比于广播的频道,当一个频道发送了消息,只要收听这个频道的所有人都能收到广播消息。

    Channels引入了一个layer的概念,channel layer是一种通信系统,允许多个consumer实例之间互相通信,以及与外部Django程序实现互通。

    channel layer主要实现了两种概念抽象:

    channel name: channel实际上就是一个发送消息的通道,每个Channel都有一个名称,每一个拥有这个名称的人都可以往Channel里边发送消息

    group: 多个channel可以组成一个Group,每个Group都有一个名称,每一个拥有这个名称的人都可以往Group里添加/删除Channel,也可以往Group里发送消息,Group内的所有channel都可以收到,但是无法发送给Group内的具体某个Channel

    配置

    官方推荐redis作为layer

    pip3 install channels-redis

    CHANNEL_LAYERS = {
        'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {"hosts": ["redis://10.211.55.25:6379/1"],},
        },
    }
    

    基于内存

    CHANNEL_LAYERS = {
        "default": {
            "BACKEND": "channels.layers.InMemoryChannelLayer",
        }
    }
    

    将channels-layer配置添加到settings中

    处理请求

    在consumer中

    from channels.generic.websocket import WebsocketConsumer
    from asgiref.sync import async_to_sync
    
    
    class ChatGroupConsumer(WebsocketConsumer):
    
        def connect(self):
            async_to_sync(self.channel_layer.group_add)('f1', self.channel_name)
            self.accept()
    
        def receive(self, text_data=None, bytes_data=None):
            async_to_sync(self.channel_layer.group_send)('f1', {
                'type': 'call.back',
                'message': text_data
            })
    
        def call_back(self, event):
            message = event['message']
            self.send(message)
    
        def disconnect(self, code):
            async_to_sync(self.channel_layer.group_discard)('f1', self.channel_name)
    

    self.channel_name的生成方式:

        async def new_channel(self, prefix="specific."):
            """
            Returns a new channel name that can be used by something in our
            process as a specific channel.
            """
            return "%s.inmemory!%s" % (
                prefix,
                "".join(random.choice(string.ascii_letters) for i in range(12)),
            )
    

    这里有几点需要注意,group_add,f1相当于群名,而每个进来的的连接通过self.channel_name(随机字符串),即将这些连接加到这个群里,同样当断开连接里通过group_discard将人从群里踢除。

    注意async_to_sync中的type参数,我们写的是call.back但在我们定义这个函数里要写成call_back, 因为源码中将“."转换成了 ”_":

    def get_handler_name(message):
        """
        Looks at a message, checks it has a sensible type, and returns the
        handler name for that type.
        """
        # Check message looks OK
        if "type" not in message:
            raise ValueError("Incoming message has no 'type' attribute")
        if message["type"].startswith("_"):
            raise ValueError("Malformed type in message (leading underscore)")
        # Extract type and replace . with _
        return message["type"].replace(".", "_")
    

    认证

    django channels 基于cookie, session实现了认证:

    from channels.auth import AuthMiddlewareStack
    
    @database_sync_to_async
    def get_user(scope):
        pass
    
    @database_sync_to_async
    def login(scope, user, backend=None):
        pass
    
    @database_sync_to_async
    def logout(scope):
        pass
    
    def _get_user_session_key(session):
        # This value in the session is always serialized to a string, so we need
        # to convert it back to Python whenever we access it.
        return get_user_model()._meta.pk.to_python(session[SESSION_KEY])
    
    
    
    
    class AuthMiddleware(BaseMiddleware):
        """
        Middleware which populates scope["user"] from a Django session.
        Requires SessionMiddleware to function.
        """
    
        def populate_scope(self, scope):
            # Make sure we have a session
            if "session" not in scope:
                raise ValueError(
                    "AuthMiddleware cannot find session in scope. SessionMiddleware must be above it."
                )
            # Add it to the scope if it's not there already
            if "user" not in scope:
                scope["user"] = UserLazyObject()
    
        async def resolve_scope(self, scope):
            scope["user"]._wrapped = await get_user(scope)
    
    
    # Handy shortcut for applying all three layers at once
    AuthMiddlewareStack = lambda inner: CookieMiddleware(
        SessionMiddleware(AuthMiddleware(inner))
    )
    

    在最后部分我们拿到的AuthMiddlewareStack是基于cookie,session的认证。需要说明的是在channel中的scope类似于django中的request,user = scope.get("user", None) 我们可以这样获取 user. 具体使用只用等后面再更新了......

    上一篇:Drf之serializer

    下一篇:疫情之下