当前位置: 代码迷 >> 综合 >> Channels ——django实时推送系统
  详细解决方案

Channels ——django实时推送系统

热度:2   发布时间:2023-12-18 07:21:41.0

       一、基于类的consumer

 与django基于类的视图类似,Channels也提供基于类的consumers,这些consumers的类都继承自同一个类BaseConsumer。通过下面的代码可以导入该类:

from channels.generic import BaseConsumer

 Channels在处理每个消息时会实例化一个对象,来处理相应的消息。每个类实例都有两个属性,self.message当前处理的消息,self.kwargs定义路由时传递的参数。routings使用route_class来替换route,来实现基于类的consumers的路由。如下:

from channels import route, route_class
from . import consumerschannel_routing = [route_class(consumers.ChatServer, path=r"^/chat/"),
]
  • BaseConsumer

 所有Consumer类的基类,但是我们仍然可以直接使用它。

from channels.generic import BaseConsumerclass MyConsumer(BaseConsumer):method_mapping = {"channel.name.here": "method_name",}def method_name(self, message, **kwargs):pass

 method_mapping是一个字典,定义了Channel_name到方法的路由,进入到路由的消息就会传给相应的方法来处理。

  • WebsocketConsumer

 Channels更多的时候是用于websocket,而Channels提供了专门来处理websocket的consumers类:WebsocketConsumer。你可以像下面那样使用它:

from channels.generic.websockets import WebsocketConsumerclass MyConsumer(WebsocketConsumer):http_user = True   #是否需要channels.auth提供的讲django的auth获取到的user复制到channels中strict_ordering = False  #是否严格按顺序处理消息def connection_groups(self, **kwargs):return ["test"]   #返回的列表中的字符串为组名,连接/断开时会自动添加到组/从组中移除def connect(self, message, **kwargs):   #连接时调用self.message.reply_channel.send({
      "accept": True}) #True接收连接请求(默认);False拒绝连接def receive(self, text=None, bytes=None, **kwargs):  #处理消息self.send(text=text, bytes=bytes) def disconnect(self, message, **kwargs):      #断开连接时处理pass

  上面的注释比较清楚了,这里不多说。另外self.path指示当前websocket连接地址的路径。

  • JsonWebsocketConsumer

 websocket大多传输的都是json字符串,为了方便于是就有了JsonWebsocketConsumer。这个和上面的WebsocketConsumer相当类似,不同之处在于receive方法:

def receive(self, content, **kwargs):#content代表接收的json对象self.send(content)

 另外,提供了self.group_send(group_name, content)方法来组播消息,并可以通过self.close()在服务端主动关闭websocket

  • WebsocketDemultiplexer

 为了更加方便的使用websocket,channels提供了这个WebsocketDemultiplexer类。这个类的作用需要websocket中json消息传输格式为{"stream":"stream_name","payload":xxxx}stream相当于再次路由,payload真实的数据。

from channels.generic.websockets import WebsocketDemultiplexer, JsonWebsocketConsumerclass EchoConsumer(websockets.JsonWebsocketConsumer):def connect(self, message, multiplexer, **kwargs):multiplexer.send({
      "status": "I just connected!"})  #注意这里使用的是multiplexer发送返回数据def disconnect(self, message, multiplexer, **kwargs):print("Stream %s is closed" % multiplexer.stream)def receive(self, content, multiplexer, **kwargs):multiplexer.send({
      "original_message": content}) #注意这里使用的是multiplexer发送返回数据class AnotherConsumer(websockets.JsonWebsocketConsumer):def receive(self, content, multiplexer=None, **kwargs):passclass Demultiplexer(WebsocketDemultiplexer):consumers = {      #这里就是对stream的再次路由"echo": EchoConsumer,"other": AnotherConsumer,}

 WebsocketDemultiplexer这个类的功能,远不止这些,他还可以实现绑定模型,model发生改变时通知客户端,客户端也可以发送指令改变model,类似于mvvm的框架。这些功能以后再介绍。

  •  Sessions and Users

 在基于类的consumers中使用session和user,只需要两个:

class MyConsumer(WebsocketConsumer):channel_session_user = Truehttp_user = True

 channel_session_user为True则为message提供了message.channel_session和message.user两个属性,与用装饰器实现的功能一样。http_user只是复制了message.user.

  •  应用自定义的装饰器

 如果需要为自己的处理函数应用装饰器,最好的办法是重写get_handler方法。

class MyConsumer(WebsocketConsumer):def get_handler(self, *args, **kwargs):handler = super(MyConsumer, self).get_handler(*args, **kwargs)return your_decorator(handler)  #your_decorator就是自定义的装饰器方法
  •   as route 方法

 比起使用route_class,channels提供了更灵活的as_route方法。例如:

from . import consumerschannel_routing = [consumers.ChatServer.as_route(path=r"^/chat/"),
]

 感觉上面貌似没减少打字的工作量,看看这个例子:

consumers.py:

class MyGenericConsumer(WebsocketConsumer):group = 'default'group_prefix = ''def connection_groups(self, **kwargs):return ['_'.join(self.group_prefix, self.group)]

routings.py:

from . import consumerschannel_routing = [consumers.MyGenericConsumer.as_route(path=r"^/path/1/",attrs={
      'group': 'one', 'group_prefix': 'pre'}),consumers.MyGenericConsumer.as_route(path=r"^/path/2/",attrs={
      'group': 'two', 'group_prefix': 'public'}),
]

这里通过attrs参数重新定义了MyGenericConsumer实例中的对应的类属性。(这个例子其实可以用path中的正则来处理。)

二、延时发送消息

  延时发送的意义在于当前的任务不能处理,或者需要延时处理,等待一段时间之后再重试,这样就需要延时发送消息。Channels提供channels.delay的应用来处理这种场景。首先添加这个应用到INSTALLED_APPS中:

INSTALLED_APPS = [...'channels','channels.delay'
]

然后执行命令:

python manage.py rundelay

这个命令执行之后会监听asgi.delay的channels。可以通过如下代码延时发送消息。

from channels import Channeldelayed_message = {'channel': 'example_channel','content': {
      'x': 1},'delay': 10 * 1000
}
Channel('asgi.delay').send(delayed_message, immediately=True)

 这里就是10s之后,发送{'x':1}到name为example_channel的channel中去。这个功能配合自定义管理命令,可以实现每隔多长时间执行一次的后台定时任务。

 关于使用channels进行后台任务的内容,敬请期待吧!


  • 上一篇: Channels ——django实时推送系统(进阶篇一)
  • 下一篇: Channels ——django实时推送系统(实战篇一)-django后台发送邮件