基于websocket的简单广播系统

在年初的时候,我们有点儿小迷茫,于是也跟风去做了一些轻娱乐类的小游戏。
那时为了实战对战,想到需要一个实时性很强的技术实现,于是我去实现了一个websocket server,没想到后来这些小程序没有成,但是我们的这个web socket server 演化得无处不在。下面介绍一下这个技术实现。

看理论肯定会有点拗口是不是,我们直接上代码就得了。我们现在假设有这么一个用户付款的逻辑,在写用户付款事件时,我们事先并不知道以后还需要加什么逻辑,于是我们先把这个行为广播出去。以下是伪代码:

    req := httplib.Post("https://ws.app.12zan.net/eventcast/user/5905e89db43fec42e3055df05ff72afe")
    text, er := zanjson.Encode(order)
    if er != nil {  
        log.Println(ev)
        return
    }
    req.Param("data", string(text))
    resp,_ = req.Response()

好了,现在,每当有用户付款时,这个用户系统都会往/eventcast/user/5905e89db43fec42e3055df05ff72afe这个频道广播一条消息。但是很遗憾,目前没有客户端订阅这类消息,所有的消息都被丢弃了。

有一天,我们英明神武的老板决定要加一个通知,每当有一个新的用户付款时,都给公司的同胞们发一个邮件通知一下,我们获得了新的付费用户,好让大家小开心一把,尤其是第一个试用客户付费的时候,我们肯定都要开心地跳起来。这时我们如果去改线上运行好的付款系统,还是有点儿风险的,一旦有修改,我们就得走一下测试流程,不然万一有问题不是影响公司发财了吗。没关系,我们之前不是已经把付款事件广播出来了吗,我们现在用起来。写这么一段js,线上运行起来,就好了。

const webSocket = require('ws');
let ws = new webSocket("wss://ws.app.12zan.net/eventcast/user/5905e89db43fec42e3055df05ff72afe");
ws.on('open', function open() {
    console.log("connected");
});
ws.on('message', function incoming(data) {
    let user = JSON.parse(data);
    Mail.send("一个叫"+user.name+"的好心人支付了"+user.amount+"元,让主赞美他!");
});

好了,现在一旦有人付款,我们全公司都能收到一个邮件,及时得到这一好消息了。让我们小小地庆祝一下吧。

接下来又过了几天,我们想改进一下体验,用户一旦付款成功,就发送一条短信,告知用户他的有效期和我们的24小时客服电话;只需要这么一段代码部署起来运行就好了, 之前的任何代码都不用动:

const webSocket = require('ws');
let ws = new webSocket("wss://ws.app.12zan.net/eventcast/user/5905e89db43fec42e3055df05ff72afe");
ws.on('open', function open() {
    console.log("connected");
});
ws.on('message', function incoming(data) {
    let user = JSON.parse(data);
    let expiresAt = (zan.Date.now().add("+365 day").format("YYYY-mm-dd"));
    SMS.send(user.Mobile,"尊敬的"+user.name+",您成功购买了十二赞旗舰版,有效期至"+expiresAt+",请登陆:https://www.12zan.cn 查看,如有任何疑问,欢迎致电4006681102");
});

发送通知邮件和发送告知短信,都基于用户付款动作,但是发邮件和发短信的代码完全隔离,相互之间出完全不知道对方的存在。

是不是很赞?那我们接下来梳理一下逻辑。

概念及主要逻辑

也许我们来不及去翻看websocket的定义,但是我们可以简单地理解,Websocket是对HTTP协议的一个扩展升级,在发起连接时,HTTP部分都是有效的,只是连接成功以后,服务端和客户端的连接不断,双方可以双向数据传输,且服务端可以主动向客户端推送数据。

我们看一次Websocket发起连接的过程(来自维基百科):

客户端向服务端发起连接:

GET / HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: example.com
Origin: http://example.com
Sec-WebSocket-Key: sN9cRrP/n9NdMgdcy2VJFQ==
Sec-WebSocket-Version: 13

服务端的返回:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: fFBooB7FAkLlXgRSz0BT3v4hq5s=
Sec-WebSocket-Location: ws://example.com/

在HTTP协议中常见的字段,如Cookies,Host等,依然有效。

但是具体到我们的应用上,十二赞的这个websocket server实现了两个小目标【多遗憾了,并没有赚到两个亿】:
1. 我们实现的是一个广播系统,一个广播系统意味着一个地方去发送数据,n多个接受端来接受数据。要支持非常多的客户端同时连上数据来实时接受数据。我们最终的server端的实现,全内存实现,没有用redis或是MySQL类似的数据库,就是为了实现超多客户端的支持。
2. 我们希望采用最简单、最通用的文案,并且,非常高效,支持非常多的客户端同时连接,我们认为http协议更简单,所以在发送的时候,我们是走http协议来发送数据的。并且,没有任何安全上的设计,如果数据很重要,请自行加密之后发送。

当然我们也有一些遗憾:
1. 允许数据丢失。有得必有失,我们允许一个比例的信息丢失。产生数据丢失时,不影响主逻辑。就像刚才的例子,发送邮件通知我们有新付款的这个事件没有触发并没有关系,我们到下午才发现有新用户付款,这时再去开香槟也不迟:(。
2. 容忍时序错乱。像刚才的例子,有新用户付款时,是先告诉我们全体同事有新付款,还是先给用户发送一条短信,并不那么重要。

好了,回到我们的系统,我们给一点点总结。

我们定义,每个websocket的入口,都是一个URL;去掉协议和HOST部分,剩下的PATH部分代表了不同的频道。比如,发起websocket时连接到ws://ws.app.12zan.net/channel/hello,那么这个频道地址就是/channel/hello;所有连接到ws.app.12zan.net/channel/hello的websocket客户端,他们会收到一模一样的消息,我们称之为订阅。

同时,为了简化发起数据的过程,我们还在websocket server中定义:当一个http 的客户端,以POST方式请求某一个地址时,我们截取URL中的PATH部分,得到频道名,并取POST的数据中的data域,作为要广播的数据,将之广播到相应的频道。

在十二赞的应用:

这个广播系统,在十二赞的整个技术架构中,后来应用的特别广。
比如,我们的部署系统zeus,在网页端实现了一个客户端,当服务端有应用重启、关闭、启动时,都会弹出消息通知。任何在打开了这个系统的网页的人都能看到。比如我和同事小王都正在zeus的网页上,我新建了一个search系统的一个节点,启动完毕的时候,我和小王会收到通知,在第三号服务器上新启了一个search系统的节点。我在操作,很关心这个,所心这时我可以放心去继续我的工作。小王正要在三号机器上新部署一个系统,他收到这个通知后,觉得这个机器可能会很忙,于是把自己的新实例部署在了四号机器上。

再比如,我们的日志服务器,担负着收集所有服务器上日志的使命。但是如果它挂掉了呢?于是我们在这个日志服务器上跑了一个定时器,每5秒钟向某个频道广播一条心跳消息,告诉世界自己还活着。然后另行跑了一个进程,收听这个频道的广播,如果连续30秒没有收到这个心跳包,证明这个日志服务器挂掉了,就发一条报警短信,通知同学去看看这个服务。

再比如,我们在日志服务上的应用,参见这里:十二赞日志系统简介

发表评论