一、基于类的consumer
与django基于类的视图类似,Channels也提供基于类的consumers,这些consumers的类都继承自同一个类BaseConsumer。通过下面的代码可以导入该类:
from channels.generic import BaseConsumer
Channels在处理每个消息时会实例化一个对象,来处理相应的消息。每个类实例都有两个属性,self.message当前处理的消息,self.kwargs定义路由时传递的参数。routings使用route_class来替换route,来实现基于类的consumers的路由。如下:
from channels import route, route_class
from . import consumerschannel_routing = [route_class(consumers.ChatServer, path=r"^/chat/"),
]
- BaseConsumer
所有Consumer类的基类,但是我们仍然可以直接使用它。
from channels.generic import BaseConsumerclass MyConsumer(BaseConsumer):method_mapping = {"channel.name.here": "method_name",}def method_name(self, message, **kwargs):pass
method_mapping是一个字典,定义了Channel_name到方法的路由,进入到路由的消息就会传给相应的方法来处理。
-
WebsocketConsumer
Channels更多的时候是用于websocket,而Channels提供了专门来处理websocket的consumers类:WebsocketConsumer。你可以像下面那样使用它:
from channels.generic.websockets import WebsocketConsumerclass MyConsumer(WebsocketConsumer):http_user = True #是否需要channels.auth提供的讲django的auth获取到的user复制到channels中strict_ordering = False #是否严格按顺序处理消息def connection_groups(self, **kwargs):return ["test"] #返回的列表中的字符串为组名,连接/断开时会自动添加到组/从组中移除def connect(self, message, **kwargs): #连接时调用self.message.reply_channel.send({
"accept": True}) #True接收连接请求(默认);False拒绝连接def receive(self, text=None, bytes=None, **kwargs): #处理消息self.send(text=text, bytes=bytes) def disconnect(self, message, **kwargs): #断开连接时处理pass
上面的注释比较清楚了,这里不多说。另外self.path指示当前websocket连接地址的路径。
-
JsonWebsocketConsumer
websocket大多传输的都是json字符串,为了方便于是就有了JsonWebsocketConsumer。这个和上面的WebsocketConsumer相当类似,不同之处在于receive方法:
def receive(self, content, **kwargs):#content代表接收的json对象self.send(content)
另外,提供了self.group_send(group_name, content)方法来组播消息,并可以通过self.close()在服务端主动关闭websocket
- WebsocketDemultiplexer
为了更加方便的使用websocket,channels提供了这个WebsocketDemultiplexer类。这个类的作用需要websocket中json消息传输格式为{"stream":"stream_name","payload":xxxx}stream相当于再次路由,payload真实的数据。
from channels.generic.websockets import WebsocketDemultiplexer, JsonWebsocketConsumerclass EchoConsumer(websockets.JsonWebsocketConsumer):def connect(self, message, multiplexer, **kwargs):multiplexer.send({
"status": "I just connected!"}) #注意这里使用的是multiplexer发送返回数据def disconnect(self, message, multiplexer, **kwargs):print("Stream %s is closed" % multiplexer.stream)def receive(self, content, multiplexer, **kwargs):multiplexer.send({
"original_message": content}) #注意这里使用的是multiplexer发送返回数据class AnotherConsumer(websockets.JsonWebsocketConsumer):def receive(self, content, multiplexer=None, **kwargs):passclass Demultiplexer(WebsocketDemultiplexer):consumers = { #这里就是对stream的再次路由"echo": EchoConsumer,"other": AnotherConsumer,}
WebsocketDemultiplexer这个类的功能,远不止这些,他还可以实现绑定模型,model发生改变时通知客户端,客户端也可以发送指令改变model,类似于mvvm的框架。这些功能以后再介绍。
- Sessions and Users
在基于类的consumers中使用session和user,只需要两个:
class MyConsumer(WebsocketConsumer):channel_session_user = Truehttp_user = True
channel_session_user为True则为message提供了message.channel_session和message.user两个属性,与用装饰器实现的功能一样。http_user只是复制了message.user.
- 应用自定义的装饰器
如果需要为自己的处理函数应用装饰器,最好的办法是重写get_handler方法。
class MyConsumer(WebsocketConsumer):def get_handler(self, *args, **kwargs):handler = super(MyConsumer, self).get_handler(*args, **kwargs)return your_decorator(handler) #your_decorator就是自定义的装饰器方法
- as route 方法
比起使用route_class,channels提供了更灵活的as_route方法。例如:
from . import consumerschannel_routing = [consumers.ChatServer.as_route(path=r"^/chat/"),
]
感觉上面貌似没减少打字的工作量,看看这个例子:
consumers.py:
class MyGenericConsumer(WebsocketConsumer):group = 'default'group_prefix = ''def connection_groups(self, **kwargs):return ['_'.join(self.group_prefix, self.group)]
routings.py:
from . import consumerschannel_routing = [consumers.MyGenericConsumer.as_route(path=r"^/path/1/",attrs={
'group': 'one', 'group_prefix': 'pre'}),consumers.MyGenericConsumer.as_route(path=r"^/path/2/",attrs={
'group': 'two', 'group_prefix': 'public'}),
]
这里通过attrs参数重新定义了MyGenericConsumer实例中的对应的类属性。(这个例子其实可以用path中的正则来处理。)
延时发送的意义在于当前的任务不能处理,或者需要延时处理,等待一段时间之后再重试,这样就需要延时发送消息。Channels提供channels.delay的应用来处理这种场景。首先添加这个应用到INSTALLED_APPS中:
INSTALLED_APPS = [...'channels','channels.delay'
]
然后执行命令:
python manage.py rundelay
这个命令执行之后会监听asgi.delay的channels。可以通过如下代码延时发送消息。
from channels import Channeldelayed_message = {'channel': 'example_channel','content': {
'x': 1},'delay': 10 * 1000
}
Channel('asgi.delay').send(delayed_message, immediately=True)
这里就是10s之后,发送{'x':1}到name为example_channel的channel中去。这个功能配合自定义管理命令,可以实现每隔多长时间执行一次的后台定时任务。
关于使用channels进行后台任务的内容,敬请期待吧!
- 上一篇: Channels ——django实时推送系统(进阶篇一)
- 下一篇: Channels ——django实时推送系统(实战篇一)-django后台发送邮件