一、SSE 服务端消息推送
SSE 是 Server-Sent Events 的简称, 是一种服务器端到客户端(浏览器)的单项消息推送。对应的浏览器端实现 Event Source 接口被制定为HTML5 的一部分。相比于 WebSocket,服务器端和客户端工作量都要小很多、简单很多,而 Tornado 又是Python中的一款优秀的高性能web框架,本文带领大家一起实践下 Tornado SSE 的实现。
本文主要探索两个方面的实践:一个是客户端发送请求,服务端的返回是分多次进行传输的,直到传输完成,这种情况下请求结束后,就可以考虑关闭 SSE了,所以这种连接可以认为是暂时的。另一种是由服务端在特定的时机下主动推送消息给到客户端,推送的时机具有不确定性,随时性,所以这种情况下需要客户端和服务端保持长久连接。
本次使用的 Tornado 版本:
tornado==6.3.2
二、短暂性场景下的 SSE 实现
短暂性场景下就是对应上面的第一点,客户端主动发送请求后,服务端分多次传输,直到完成,数据获取完成后连接就可以断开了,适用于一些接口复杂,操作步骤多的场景,可以提前告诉客户端现在进行到了哪一步了,并且这种方式也有利于服务端的横向扩展。
在 Tornado 中实现,需要注意的是要关闭 _auto_finish ,这样的话就不会被框架自己主动停止连接了,下面是一个实现的案例:
import time from tornado.concurrent import run_on_executor from tornado.web import RequestHandler import tornado.gen from concurrent.futures.thread import ThreadPoolExecutor class SSE(RequestHandler): def initialize(self): # 关闭自动结束 self._auto_finish = False print("initialize") def set_default_headers(self): # 设置为事件驱动模式 self.set_header('Content-Type', "text/event-stream") # 不使用缓存 self.set_header('Content-Control', "no-cache") # 保持长连接 self.set_header('Connection', "keep-alive") # 允许跨域 self.set_header('Access-Control-Allow-Origin', "*") def prepare(self): # 准备线程池 self.executor = self.application.pool @tornado.gen.coroutine def get(self): result = yield self.doHandle() self.write(result) # 结束 self.finish() @run_on_executor def doHandle(self): tornado.ioloop.IOLoop.current() # 分十次推送信息 for i in range(10): time.sleep(1) self.flush() self.callback(f"current: {i}") return f"data: end\n\n" def callback(self, message): # 事件推送 message = f"data: {message}\n\n" self.write(message) self.flush() class Application(tornado.web.Application): def __init__(self): handlers = [ ("/sse", SSE), ("/(.*)$", tornado.web.StaticFileHandler, { "path": "resources/static", "default_filename": "index.html" }) ] super(Application, self).__init__(handlers) self.pool = ThreadPoolExecutor(200) def startServer(port): app = Application() httpserver = tornado.httpserver.HTTPServer(app) httpserver.listen(port) print(f"Start server success", f"The prot = {port}") tornado.ioloop.IOLoop.current().start() if __name__ == '__main__': startServer(8020)
运行后可以到浏览器访问:http://localhost:8020/sse,此时就可以看到服务端在不断地推送数据过来了:
那如何在前端用 JS 获取数据呢,前面提到在 JS 层面,有封装好的 Event Source 组件可以直接拿来使用,例如:
测试服务器推送技术
运行后可以看到服务端分阶段推送过来的数据:
三、长连接场景下的 SSE 实现
上面实现了客户端请求后,分批次返回,但是有些情况下是客户端连接后没有东西返回,而是在某个特定的时机下返回给某几个客户端,所以这种情况,我们需要和客户端保持长久的连接,同时进行客户端连接的缓存,因为同时有可能有 100 个用户,但是推送时可能只需要给 10 个用户推送,这种方式相当于将一个客户端和一个服务端进行了绑定,一定程度上不利于服务端的横向扩展,但也可以通过一些消息订阅的方式解决类似问题。
下面是一个实现案例:
import time from tornado.concurrent import run_on_executor from tornado.web import RequestHandler import tornado.gen from concurrent.futures.thread import ThreadPoolExecutor # 单例 def singleton(cls): instances = {} def wrapper(*args, **kwargs): if cls not in instances: instances[cls] = cls(*args, **kwargs) return instances[cls] return wrapper # 订阅推送工具类 @singleton class Pusher(): def __init__(self): self.clients = {} def add_client(self, client_id, callback): if client_id not in self.clients: self.clients[client_id] = callback print(f"{client_id} 连接") def send_all(self, message): for client_id in self.clients: callback = self.clients[client_id] print("发送消息给:", client_id) callback(message) def send(self, client_id, message): callback = self.clients[client_id] print("发送消息给:", client_id) callback(message) class SSE(RequestHandler): # 定义推送者 pusher = Pusher() def initialize(self): # 关闭自动结束 self._auto_finish = False print("initialize") def set_default_headers(self): # 设置为事件驱动模式 self.set_header('Content-Type', "text/event-stream") # 不使用缓存 self.set_header('Content-Control', "no-cache") # 保持长连接 self.set_header('Connection', "keep-alive") # 允许跨域 self.set_header('Access-Control-Allow-Origin', "*") @tornado.gen.coroutine def get(self): # 客户端唯一标识 client_id = self.get_argument("client_id") self.pusher.add_client(client_id, self.callback) def callback(self, message): # 事件推送 message = f"data: {message}\n\n" self.write(message) self.flush() # 定义推送接口,模拟推送 class Push(RequestHandler): # 定义推送者 pusher = Pusher() def prepare(self): # 准备线程池 self.executor = self.application.pool @tornado.gen.coroutine def get(self): # 客户端标识 client_id = self.get_argument("client_id") # 推送的消息 message = self.get_argument("message") result = yield self.doHandle(client_id, message) self.write(result) @run_on_executor def doHandle(self, client_id, message): tornado.ioloop.IOLoop.current() self.pusher.send(client_id, message) return "success" class Application(tornado.web.Application): def __init__(self): handlers = [ ("/sse", SSE), ("/push", Push), ("/(.*)$", tornado.web.StaticFileHandler, { "path": "resources/static", "default_filename": "index.html" }) ] super(Application, self).__init__(handlers) self.pool = ThreadPoolExecutor(200) def startServer(port): app = Application() httpserver = tornado.httpserver.HTTPServer(app) httpserver.listen(port) print(f"Start server success", f"The prot = {port}") tornado.ioloop.IOLoop.current().start() if __name__ == '__main__': startServer(8020)
这里我定义了一个 Pusher 订阅推送工具类,用来存储客户端的连接,以及给指定客户端或全部客户端发送消息,然后我又定义 Push 接口,模拟不定时的指定客户端发送信息的场景。
同样前端也要修改,需要给自己定义 client_id ,例如:
测试服务器推送技术
这里我用 uuid 模拟客户端的唯一ID,在真实使用时可不要这么做。
下面使用浏览器打开三个页面,可以看到三个不同的 client_id :
在服务端的日志中也能看到这三个客户端的连接:
下面调用 push 接口来给任意一个客户端发送消息,例如这里发给client_id = 2493045e-84dd-4118-8d96-0735c4ac186b 的用户 :
下面看到 client_id 是 2493045e-84dd-4118-8d96-0735c4ac186b的页面:
已经成功收到推送的消息,反之看另外两个:
都没有消息,到这里就实现了长连接下不定时的服务端消息推送方案。
猜你喜欢
- 12天前(三亚海棠湾君悦度假酒店)三亚海棠湾君悦酒店暑期夏令营悦趣海岛游招募中
- 12天前(fender japan hybrid)Fender东京旗舰店盛大开幕在即,开售商品和店内服务提前揭晓
- 12天前(河南省文旅大会精神)2025河南省文化旅游发展大会新闻发布会在郑州召开
- 12天前(天气预报 华为)2025HDC华为天气上新系统级天气智能体,引领更智能的气象服务
- 12天前(071 圣安东尼奥)秋季 圣安东尼奥交出了私藏活动清单
- 12天前(曼谷丽思卡尔顿公寓价格)曼谷丽思卡尔顿酒店盛大启幕,开创泰国奢华雅致新纪元
- 12天前(安岚度假村及酒店推出"山海之约"目的地婚礼计划)安岚度假村及酒店推出"山海之约"目的地婚礼计划
- 12天前(殷建祥简历)全国十大牛商解码:殷建祥如何用178天技术突围打造星空梦星空房
- 12天前(内蒙古冬季旅游攻略)内蒙古冬日奇遇:携程租车带你策马踏雪
- 12天前(锦江 iu)锦江荟APP原生鸿蒙版正式上线打造全场景旅行服务新体验
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章