websocket的使用

# websocket

# 各种协议的关系比较

长连接:一个连接上可以连续发送多个数据包,在连接期间,如果没有数据包发送,需要双方发链路检查包。

TCP/IP:TCP/IP属于传输层,主要解决数据在网络中的传输问题,只管传输数据。但是那样对传输的数据没有一个规范的封装、解析等处理,使得传输的数据就很难识别,所以才有了应用层协议对数据的封装、解析等,如HTTP协议。

HTTP:HTTP是应用层协议,封装解析传输的数据。 从HTTP1.1开始其实就默认开启了长连接,也就是请求header中看到的Connection:Keep-alive。但是这个长连接只是说保持了(服务器可以告诉客户端保持时间Keep-Alive:timeout=200;max=20;)这个TCP通道,直接Request - Response,而不需要再创建一个连接通道,做到了一个性能优化。但是HTTP通讯本身还是Request - Response。

Socket与HTTP不一样,socket不是协议,它是在程序层面上对传输层协议(可以主要理解为TCP/IP)的接口封装。 我们知道传输层的协议,是解决数据在网络中传输的,那么socket就是传输通道两端的接口。所以对于前端而言,Socket也可以简单的理解为对TCP/IP的抽象协议

WebSocket: WebSocket是包装成了一个应用层协议作为socket,从而能够让客户端和远程服务端通过web建立全双工通信。websocket提供ws和wss两种URL方案。跟socket的关系,类似于javascript跟java的关系

Websocket是html5提出的一个协议规范,是为解决客户端与服务端实时通信。本质上是一个基于tcp,先通过HTTP/HTTPS协议发起一条特殊的http请求进行握手后创建一个用于交换数据的TCP连接。只需要要做一个握手的动作,在建立连接之后,双方可以在任意时刻,相互推送信息。同时,服务器与客户端之间交换的头信息很小。

# 轮询、长轮询和iframe流的比较

# 轮询(polling)

轮询是客户端和服务器之间会一直进行连接,每隔一段时间就询问一次。其缺点也很明显:连接数会很多,一个接受,一个发送。而且每次发送请求都会有Http的Header,会很耗流量,也会消耗CPU的利用率

# 优缺点
  • 优点:实现简单,无需做过多的更改
  • 缺点:轮询的间隔过长,会导致用户不能及时接收到更新的数据;轮询的间隔过短,会导致查询请求过多,增加服务器端的负担

# 长轮询(long-polling)

长轮询是对轮询的改进版,客户端发送HTTP给服务器之后,看有没有新消息,如果没有新消息,就一直等待。当有新消息的时候,才会返回给客户端。

在某种程度上减小了网络带宽和CPU利用率等问题。由于http数据包的头部数据量往往很大(通常有400多个字节),但是真正被服务器需要的数据却很少(有时只有10个字节左右),这样的数据包在网络上周期性的传输,难免对网络带宽是一种浪费

# 优缺点
  • 优点:比 Polling 做了优化,有较好的时效性
  • 缺点:保持连接会消耗资源; 服务器没有返回有效数据,程序超时。

# iframe流(streaming)

iframe流方式是在页面中插入一个隐藏的iframe,利用其src属性在服务器和客户端之间创建一条长连接,服务器向iframe传输数据(通常是HTML,内有负责插入信息的javascript),来实时更新页面。

# 优缺点
  • 优点:消息能够实时到达;浏览器兼容好
  • 缺点:服务器维护一个长连接会增加开销;IE、chrome、Firefox会显示加载没有完成,图标会不停旋转。

# SSE(Server-Sent Events)

SSE(Server-Sent Events)与长轮询机制类似,区别是每个连接不只发送一个消息。客户端发送一个请求,服务端保持这个连接直到有新消息发送回客户端,仍然保持着连接,这样连接就可以支持消息的再次发送,由服务器单向发送给客户端。然而IE直到11都不支持;

# 各种图示流程及示例

12-18-47 12-19-45

# Web 实时推送技术的比较

14-59-17

上述代码中,客户端只请求一次,然而服务端却是源源不断向客户端发送数据,这样服务器维护一个长连接会增加开销。以上我们介绍了三种实时推送技术,然而各自的缺点很明显,使用起来并不理想,接下来我们着重介绍另一种技术--websocket,它是比较理想的双向通信技术

综上所述:

Websocket协议不仅解决了HTTP协议中服务端的被动性,即通信只能由客户端发起,

也解决了数据同步有延迟的问题,同时还带来了明显的性能优势,所以websocket 是Web 实时推送技术的比较理想的方案,但如果要兼容低版本浏览器,可以考虑用轮询来实现。

# websocket简介

WebSocket作为一种通信协议,属于服务器推送技术的一种,IE10+支持。

在WebSocket协议之前,有三种实现双向通信的方式:轮询(polling)长轮询(long-polling)iframe流(streaming)

WebSocket是一种全新的协议,随着HTML5草案的不断完善,越来越多的现代浏览器开始全面支持WebSocket技术了,它将TCP的Socket(套接字)应用在了webpage上,从而使通信双方建立起一个保持在活动状态连接通道。

WebSocket提供了一个文明优雅的全双工通信方案。一般适合于对数据的实时性要求比较强的场景,如通信、股票、直播、共享桌面,特别适合于客户端与服务频繁交互的情况下,如聊天室、实时共享、多人协作等平台。

一旦Web服务器与客户端之间建立起WebSocket协议的通信连接,之后所有的通信都依靠这个专用协议进行。通信过程中可互相发送JSON、XML、HTML或图片等任意格式的数据。由于是建立在HTTP基础上的协议,因此连接的发起方仍是客户端,而一旦确立WebSocket通信连接,不论服务器还是客户端,任意一方都可直接向对方发送报文

# WebSocket的特点

  1. 支持双向通信,实时性更强
  2. 建立在 TCP 协议之上,服务器端的实现比较容易。
  3. 与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
  4. 数据格式比较轻量,性能开销小,通信高效。服务器与客户端之间交换的标头信息大概只有2字节;
  5. 可以发送文本,也可以发送二进制数据
  6. 没有同源限制,客户端可以与任意服务器通信。
  7. 协议标识符是 ws(如果加密,则为wss),服务器网址就是 URL; 示例:ws://xxx.com:80/some/path
  8. 不用频繁创建及销毁TCP请求,减少网络带宽资源的占用,同时也节省服务器资源; 减少通信量:只要建立起WebSocket连接,就希望一直保持连接状态。和HTTP相比,不但每次连接时的总开销减少,而且由于WebSocket的首部信息很小,通信量也相应减少了
  9. WebSocket是纯事件驱动的,一旦连接建立,通过监听事件可以处理到来的数据和改变的连接状态,数据都以帧序列的形式传输。服务端发送数据后,消息和事件会异步到达。
  10. 无超时处理。

# Websocket与Http的比较

# HTTP的局限性

  • HTTP是半双工协议,也就是说,在同一时刻数据只能单向流动,客户端向服务器发送请求(单向的),然后服务器响应请求(单向的)。
  • 服务器不能主动推送数据给浏览器。这就会导致一些高级功能难以实现,诸如聊天室场景就没法实现。
  • WebSocket 与 HTTP 内部都是基于 TCP 协议,区别在于 HTTP 是单向的(单双工),WebSocket 是双向的(全双工),协议是 ws://wss:// 对应 http://https://,因为没有跨域限制,所以使用 file:// 协议也可以进行通信。

# 通讯过程

# 图示http跟websocket流程及示例

14-50-55

分析:

  • 一个HTTP的通信生命周期通过 Request 来界定,也就是一个 Request 一个 Response ,那么在 HTTP1.0 中,这次HTTP请求就结束了。
  • HTTP1.1中进行了改进,有了一个 keep-alive,在一个HTTP连接中,可以发送多个Request,接收多个Response,也就是合并多个请求。但是一个Request只能对应一个Response,而且这个Response是被动的,不能主动发起。
  • Websocket 其实是一个新协议,但是为了兼容现有浏览器的握手规范而借用了HTTP的协议来完成一部分握手。WebSocket编程遵循一个异步编程模型,只需要对WebSocket对象增加回调函数就可以监听事件。
  • 相对于传统的HTTP每次请求-应答都需要客户端与服务端建立连接的模式,WebSocket是类似Socket的TCP长连接的通讯模式,一旦WebSocket连接建立后,后续数据都以帧序列的形式传输。在客户端断开WebSocket连接或Server端断掉连接前,不需要客户端和服务端重新发起连接请求。在海量并发和客户端与服务器交互负载流量大的情况下,极大的节省了网络带宽资源的消耗,有明显的性能优势,且客户端发送和接受消息是在同一个持久连接上发起,实时性优势明显

# webSocket与传统的http的优势

  • 客户端与服务器只需要一个TCP连接,比http长轮询使用更少的连接
  • webSocket服务端可以推送数据到客户端
  • 更轻量的协议头,减少数据传输量

# 通讯原理

当客户端要和服务端建立 WebSocket 连接时,在客户端和服务器的握手过程中,客户端首先会向服务端发送一个 HTTP 请求,包含一个 Upgrade 请求头来告知服务端客户端想要建立一个 WebSocket 连接

在客户端建立一个 WebSocket 连接非常简单:let ws = new WebSocket('ws://localhost:9000');

类似于 HTTP 和 HTTPS,ws 相对应的也有 wss 用以建立安全连接,本地已 ws 为例。这时的请求头如下:

Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9
Cache-Control: no-cache
Connection: Upgrade	//表示该连接要升级协议
Cookie: _hjMinimizedPolls=358479; ts_uid=7852621249; CNZZDATA1259303436=1218855313-1548914234-%7C1564625892; csrfToken=DPb4RhmGQfPCZnYzUCCOOade; JSESSIONID=67376239124B4355F75F1FC87C059F8D; _hjid=3f7157b6-1aa0-4d5c-ab9a-45eab1e6941e; acw_tc=76b20ff415689655672128006e178b964c640d5a7952f7cb3c18ddf0064264
Host: localhost:9000
Origin: http://localhost:9000
Pragma: no-cache
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key: 5fTJ1LTuh3RKjSJxydyifQ==	//与响应头 Sec-WebSocket-Accept 相对应
Sec-WebSocket-Version: 13	//表示websocket协议的版本
Upgrade: websocket	//表示要升级到websocket协议
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36
1
2
3
4
5
6
7
8
9
10
11
12
13

响应头如下:

Connection: Upgrade
Sec-WebSocket-Accept: ZUip34t+bCjhkvxxwhmdEOyx9hE=
Upgrade: websocket
1
2
3

此时响应行(General)中可以看到状态码 status code 是 101 Switching Protocols , 表示该连接已经从 HTTP 协议转换为 WebSocket 通信协议。 转换成功之后,该连接并没有中断,而是建立了一个全双工通信,后续发送和接收消息都会走这个连接通道

注意,请求头中有个 Sec-WebSocket-Key 字段,和相应头中的 Sec-WebSocket-Accept 是配套对应的,它的作用是提供了基本的防护,比如恶意的连接或者无效的连接。Sec-WebSocket-Key 是客户端随机生成的一个 base64 编码,服务器会使用这个编码,并根据一个固定的算法:

GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";// 一个固定的字符串
accept = base64(sha1(key + GUID));// key 就是 Sec-WebSocket-Key 值,accept 就是 Sec-WebSocket-Accept 值
1
2

客户端拿到服务端响应的 Sec-WebSocket-Accept 后,会拿自己之前生成的 Sec-WebSocket-Key 用相同算法算一次,如果匹配,则握手成功。然后判断 HTTP Response 状态码是否为 101(切换协议),如果是,则建立连接,大功告成。

# webSocket协议升级过程

首先,WebSocket连接必须由浏览器发起,因为请求协议是一个标准的HTTP请求,格式如下:

GET ws://localhost:3000/ws/chat HTTP/1.1
Host: localhost
Upgrade: websocket
Connection: Upgrade
Origin: http://localhost:3000
Sec-WebSocket-Key: client-random-string
Sec-WebSocket-Version: 13
1
2
3
4
5
6
7

该请求和普通的HTTP请求有几点不同

  • GET请求的地址不是类似/path/,而是以ws://开头的地址;
  • 请求头Upgrade: websocket和Connection: Upgrade表示这个连接将要被转换为WebSocket连接
  • Sec-WebSocket-Key是用于标识这个连接,并非用于加密数据;
  • Sec-WebSocket-Version指定了WebSocket的协议版本。

随后,服务器如果接受该请求,就会返回如下响应:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: server-random-string
1
2
3
4

该响应代码101表示本次连接的HTTP协议即将被更改,更改后的协议就是Upgrade: websocket指定的WebSocket协议。

# HTTP与WS协议结构

WebSocket协议标识符用 ws表示。wss协议表示加密的WebSocket协议,对应HTTPs协议。结构如下:

  • HTTP: TCP > HTTP
  • HTTPS: TCP > TLS > HTTP
  • WS: TCP > WS
  • WSS: TCP > TLS > WS
# WebSocket握手报文

一个浏览器发出的WebSocket请求报文类似于:

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
Origin: http://example.com
1
2
3
4
5
6
7
8

HTTP1.1协议规定,Upgrade头信息表示将通信协议从HTTP/1.1转向该项所指定的协议

  • Connection:Upgrade表示浏览器通知服务器,如果可以,就升级到webSocket协议
  • Origin用于验证浏览器域名是否在服务器许可的范围内。
  • Sec-WebSocket-Key则是用于握手协议的密钥,是浏览器生成的Base64编码的16字节随机字符串
  • Sec-WebSocket-Protocol是一个用户定义的字符串,用来区分同URL下,不同的服务所需要的协议
  • Sec-WebSocket-Version是告诉服务器所使用的协议版本。

服务端WebSocket回复报文:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=
Sec-WebSocket-Protocol: chat
Sec-WebSocket-Origin: null
Sec-WebSocket-Location: ws://example.com/
1
2
3
4
5
6
7
  • 服务器端同样用 Connection:Upgrade通知浏览器,服务端已经成功切换协议。
  • Sec-WebSocket-Accept是经过服务器确认并且加密过后的 Sec-WebSocket-Key
  • Sec-WebSocket-Location表示进行通信的WebSocket网址。
  • Sec-WebSocket-Protocol表示最终使用的协议。

在这样一个类似于HTTP通信的握手结束之后,下面就按照WebSocket协议进行通信了。客户端与服务器之间不会再发生HTTP通信,一切由WebSocket 协议接管。

# 事件

const ws=new WebSocket('ws://localhost:8080')
switch (ws.readyState) {//WebSocket实例对象类似于XHR有个的只读属性 readyState来指示连接的当前状态:
  case WebSocket.CONNECTING:
    // ...
    break;
  case WebSocket.OPEN:
    // ...
    break;
  case WebSocket.CLOSING:
    // ...
    break;
  case WebSocket.CLOSED:
    // ...
    break;
  default:
    //  this never happens
    break;
}
ws.onopen = function () {//实例对象的 onopen属性,用于指定连接成功后的回调函数。
  ws.send('Hello Server!');
}
ws.addEventListener('open', function (event) {/如果要指定多个回调函数,可以 addEventListener。
  ws.send('Hello Server!');
});
ws.onclose = function(event) {//实例对象的 onclose属性,用于指定连接关闭后的回调函数。
    const { code, reason, wasClean} = event
    // ...
};
ws.addEventListener('close', function(event) {
    const { code, reason, wasClean} = event
    // ...
})
ws.onmessage = function(event) {//实例对象的 onmessage属性,用于指定收到服务器数据后的回调函数。
  const { data } = event;
  // ...
};
ws.addEventListener('message', function(event) {
  const { data } = event; 
  // ...
});
ws.binaryType = 'blob';                // 收到的是 Blob 数据
ws.binaryType = 'arraybuffer';         // 收到的是 ArrayBuffer 数据
ws.onmessage = function(event){//注意,服务器数据可能是文本,也可能是二进制数据(blob对象或Arraybuffer对象)。
  if(typeof event.data === String) {
    // string
  }
  if(event.data instanceof ArrayBuffer){
    const { data: buffer } = event;
    // array buffer
  }
}
ws.onerror = function(event) {
  // handle error event
};
ws.addEventListener("error", function(event) {
  // handle error event
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

# Websocket/Tcp消息帧粘包,拆包及处理方法

TCP是个流协议,它存在粘包问题

TCP是一个基于字节流的传输服务,"流"意味着TCP所传输的数据是没有边界的。这不同于UDP提供基于消息的传输服务,其传输的数据是有边界的。TCP的发送方无法保证对等方每次接收到的是一个完整的数据包。tcp再传输数据时,发送消息并非一包一包发送,存在粘包、拆包的情况;

websocket 底层使用的tcp 协议。 当一次发送数据过长时,tcp 会把数据封成多个包发送;同样当数据过短时, 会把数据合并成一个包发送,这种现象就是粘包。

# 粘包、拆包表现形式

现在假设客户端向服务端连续发送了两个数据包,用packet1和packet2来表示,那么服务端收到的数据可以分为三种,现列举如下:

  • 第一种情况(正常情况)

    接收端正常收到两个数据包,即没有发生拆包和粘包的现象,此种情况不在本文的讨论范围内。

  • 第二种情况(粘包:两帧数据放在一个tcp消息包中)

    接收端只收到一个数据包,由于TCP是不会出现丢包的,所以这一个数据包中包含了发送端发送的两个数据包的信息,这种现象即为粘包。这种情况由于接收端不知道这两个数据包的界限,所以对于接收端来说很难处理。

  • 第三种情况(拆包:一帧数据被拆分在两个tcp消息包中)

    这种情况有两种表现形式,如下图。接收端收到了两个数据包,但是这两个数据包要么是不完整的,要么就是多出来一块,这种情况即发生了拆包和粘包。这两种情况如果不加特殊处理,对于接收端同样是不好处理的。

# 粘包、拆包发生原因

发生TCP粘包或拆包有很多原因,现列出常见的几点,可能不全面,欢迎补充,

1、要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。应用层调用write方法,将应用层的缓冲区中的数据拷贝到套接字的发送缓冲区。而发送缓冲区有一个SO_SNDBUF的限制,如果应用层的缓冲区数据大小大于套接字发送缓冲区的大小,则数据需要进行多次的发送。

2、待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。TCP所传输的报文段有MSS的限制,如果套接字缓冲区的大小大于MSS,也会导致消息的分割发送

3、要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包

4、接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包

5、使用更加复杂的应用层协议。

# 粘包、拆包解决办法

通过以上分析,我们清楚了粘包或拆包发生的原因,那么如何解决这个问题呢?解决问题的关键在于如何给每个数据包添加边界信息,常用的方法有如下几个:

1、发送端给每个数据包添加包首部,首部中应该至少包含数据包的长度,这样接收端在接收到数据后,通过读取包首部的长度字段,便知道每一个数据包的实际长度了。包头加上包体长度。包头是定长的4个字节,说明了包体的长度。接收对等方先接收包体长度,依据包体长度来接收包体。

2、发送端将每个数据包封装为固定长度(不够的可以通过补0填充)【推荐】,这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开来。如果每个消息的大小都是一样的,那么在接收对等方只要累计接收数据,直到数据等于一个定长的数值就将它作为一个消息。

3、可以在数据包之间设置边界,如添加特殊符号,这样,接收端通过这个边界就可以将不同的数据包拆分开。包尾加上\r\n标记。FTP协议正是这么做的。但问题在于如果数据正文中也含有\r\n,则会误判为消息的边界。

示例:

//当前发送方发送了两个包,两个包的内容如下:
123456789
ABCDEFGH

//粘包情况
123456789ABCDEFGH
//分包情况
12345
6789
ABCDE
FGH

//处理办法:
//给数据包的头尾加上标记
START123456789END
STARTABCDEFGHEND
//在数据包头部加上内容的长度
PACKAGELENGTH:0009123456789
PACKAGELENGTH:0008ABCDEFGH
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 心跳及重连机制

心跳机制是每隔一段时间会向服务器发送一个数据包,告诉服务器自己还活着,同时客户端会确认服务器端是否还活着,如果还活着的话,就会回传一个数据包给客户端来确定服务器端也还活着,否则的话,有可能是网络断开连接了。需要重连;

# 实现心跳检测的思路

每隔一段固定的时间,向服务器端发送一个ping数据,如果在正常的情况下,服务器会返回一个pong给客户端,如果客户端通过 onmessage事件能监听到的话,说明请求正常,这里我们使用了一个定时器,每隔3秒的情况下,如果是网络断开的情况下,在指定的时间内服务器端并没有返回心跳响应消息,因此服务器端断开了,因此这个时候我们使用

ws.close关闭连接,在一段时间后(在不同的浏览器下,时间是不一样的,firefox响应更快), 可以通过 onclose事件监听到。因此在onclose事件内,我们可以调用 reconnect事件进行重连操作。

客户端示例实现:在项目中直接使用插件即可,这只是为了演示一下什么是心跳,理解最基本的原理

socket.io..和pomelo的socket都实现了心跳..

1.服务端向客户端发送心跳..,客户端记录接受心跳的时间..

2.客户端每隔一段时间检查,服务端的心跳时间是否大于超时时间

默认情况下, socket.io 客户机将向服务器发送心跳每 15秒( 磅的心跳间隔 ), 如果服务器没有收到客户机 20秒( 心跳超时 ) 将考虑客户端断开连接。

<html>
<head>
  <meta charset="utf-8">
  <title>WebSocket Demo</title>
</head>
<body>
  <script type="text/javascript">
    // var ws = new WebSocket("wss://echo.websocket.org");
    /*
    ws.onerror = function(e) {
      console.log('已关闭');
    };
    ws.onopen = function(e) {
      console.log('握手成功');
      ws.send('123456789');
    }
    ws.onclose = function() {
      console.log('已关闭');
    }
    ws.onmessage = function(e) {
      console.log('收到消息');
      console.log(e);
    }
    */
    var lockReconnect = false;//避免重复连接
    var wsUrl = "wss://echo.websocket.org";
    var ws;
    var tt;
    function createWebSocket() {
      try {
        ws = new WebSocket(wsUrl);
        init();
      } catch(e) {
        console.log('catch');
        reconnect(wsUrl);
      }
    }
    function init() {
      ws.onclose = function () {
        console.log('链接关闭');
        reconnect(wsUrl);
      };
      ws.onerror = function() {
        console.log('发生异常了');
        reconnect(wsUrl);
      };
      ws.onopen = function () {
        //心跳检测重置
        heartCheck.start();
      };
      ws.onmessage = function (event) {
        //拿到任何消息都说明当前连接是正常的
        console.log('接收到消息');
        heartCheck.start();
        //heartCheck.reset();
      }
    }
    function reconnect(url) {
      if(lockReconnect) {
        return;
      };
      lockReconnect = true;
      //没连接上会一直重连,设置延迟避免请求过多
      tt && clearTimeout(tt);
      tt = setTimeout(function () {
        createWebSocket(url);
        lockReconnect = false;
      }, 4000);
    }
    //心跳检测
    var heartCheck = {
      timeout: 3000,
      timeoutObj: null,
      serverTimeoutObj: null,
        reset: function(){
        	clearTimeout(this.timeoutObj);
        	clearTimeout(this.serverTimeoutObj);
        	this.start();
      },
      start: function(){
        console.log('start');
        var self = this;
        this.timeoutObj && clearTimeout(this.timeoutObj);
        this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
        this.timeoutObj = setTimeout(function(){
          //这里发送一个心跳,后端收到后,返回一个心跳消息,
          ws.send("HeartBeatSamy");
          self.serverTimeoutObj = setTimeout(function() {
            console.log(111);
            console.log(ws);
            ws.close();
           //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
            // createWebSocket();
          }, self.timeout);

        }, this.timeout)
      }
    }
    createWebSocket(wsUrl);
  </script>
</body>
</html>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

# 简单示例

function connectWebsocket() {
    ws = new WebSocket('ws://localhost:9000');
    // 监听连接成功
    ws.onopen = () => {
        console.log('连接服务端WebSocket成功');
        ws.send(JSON.stringify(msgData));	// send 方法给服务端发送消息
    };
    // 监听服务端消息(接收消息)
    ws.onmessage = (msg) => {
        let message = JSON.parse(msg.data);
        console.log('收到的消息:', message)
        elUl.innerHTML += `<li class="b">小秋:${message.content}</li>`;
    };
    // 监听连接失败
    ws.onerror = () => {
        console.log('连接失败,正在重连...');
        connectWebsocket();
    };
    // 监听连接关闭
    ws.onclose = () => {
    	console.log('连接关闭');
    };
};
connectWebsocket();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const path = require('path');
const express = require('express');
const app = express();
const server = require('http').Server(app);
const WebSocket = require('ws');
const wss = new WebSocket.Server({ server: server });

wss.on('connection', (ws) => { 
  // 监听客户端发来的消息
  ws.on('message', (message) => {
    console.log(wss.clients.size);
    let msgData = JSON.parse(message);   
    if (msgData.type === 'open') {
      // 初始连接时标识会话
      ws.sessionId = `${msgData.fromUserId}-${msgData.toUserId}`;
    } else {
      let sessionId = `${msgData.toUserId}-${msgData.fromUserId}`;
      wss.clients.forEach(client => {
        if (client.sessionId === sessionId) {
          client.send(message);	 // 给对应的客户端连接发送消息
        }
      })  
    }
  })
  // 连接关闭
  ws.on('close', () => {
    console.log('连接关闭');  
  });
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# socket.io

# 事件

# 服务端事件

事件监听是实现通讯的基础,因此充分了解socket.io的事件,学习如何在正确的时候使用它们至关重要。在一些关键的的状态下,socket.io可以注册相应的事件,通过事件监听,我们可以在这些事件中作出反应,常用的事件如下:

  • connection——客户端成功连接到服务器。
  • message——捕获客户端send信息。。
  • disconnect——客户端断开连接。
  • error——发生错误。

# 客户端事件

较服务端而言,客户端提供更多的监听事件,在实时应用中,我们可以为这些事件注册监听并作出反应,例如:connect提示用户连接成功,disconnect时提示用户停止服务等等。

  • connection——成功连接到服务器。
  • connecting——正在连接。
  • disconnect——断开连接。
  • connect_failed——连接失败。
  • error——连接错误。
  • message——监听服务端send的信息。
  • reconnect_failed——重新连接失败。
  • reconnect——重新连接成功。
  • reconnecting——正在重连。

那么客户端socket发起连接时的顺序是怎么样的呢?当第一次连接时,事件触发顺序为: connectingconnect

当失去连接时,事件触发顺序为:disconnect → reconnecting → connecting → reconnect → connect

# 连接设置

var app = express();
var server = http.createServer(app);
var socketio = require('socket.io')(server, {
  serveClient: config.env !== 'production',
  path: '/socket.io-client',
  pingTimeout: 1000 * 10, // default 1000 * 10 * 6
  pingInterval: 1000 * 2, // default 1000 * 2.5
});

global.socketUsers = [];
require('./config/socketio').default(socketio);
1
2
3
4
5
6
7
8
9
10
11

# 心跳机制

默认情况下, socket.io 客户机将向服务器发送心跳每 25秒( 磅的心跳间隔 ), 如果服务器没有收到客户机 60秒( 心跳超时 ) 将考虑客户端断开连接。

# 服务器配置

Socket.IO服务器的配置项与engine.io相同。

  • pingTimeout (Number):等待响应包的时间,单位为毫秒。默认为60000。
  • pingInterval (Number):设定每隔在一定时间发送一个ping包,可以用于心跳包的设置。默认为25000。
  • maxHttpBufferSize (Number):消息最大大小,可用于避免DoS攻击。默认为 10E7。
  • allowRequest (Function):该配置项为一个函数。第一个参数是一个一个握手或连接请求。第二个参数是一个回调函数function(err,success),success是boolean值,false表示连接失败;err是错误码。该函数可以用于决定是否继续。
  • transports (<Array> String):指定传输的连接方式。默认为['polling', 'websocket']。
  • allowUpgrades (Boolean):是否允许升级传输协议。 默认为true。
  • cookie (String|Boolean):HTTP Cookie的名字。默认为io。

# 常用的事件及方法

io.on('connection', function(socket){
  socket.on('reply', function(){ /* */ }); // listen to the event;监听消息
    
  // the following two will emit to all the sockets connected to `/`
  io.sockets.emit('hi', 'everyone');
  io.emit('hi', 'everyone'); //short form; 上面的缩写;emit an event to all connected sockets; 一对多(广播),包括它自己
  
  socket.broadcast.emit('user connected')//Broadcasting messages; 一对多(广播),不包括它自己;
  socket.emit('request', /* */); //emit an event to the socket; 一对一
  
  socket.volatile.emit('bieber tweet', tweet); //发送易失性消息(new)
  socket.on('ferret', function (name, word, fn) {//发送和获取数据(确认)(new)
    fn(name + ' says ' + word);
  });
  
  var tweets = setInterval(function () {
    getBieberTweet(function (tweet) {
      socket.volatile.emit('bieber tweet', tweet);//发送易失性消息(new)
    });
  }, 100);
  socket.on('disconnect', function () {
    clearInterval(tweets);
  });
});
////发送和获取数据(确认)的客户端操作;
var socket = io(); // TIP: io() with no args does auto-discovery
  socket.on('connect', function () { // TIP: you can avoid listening on `connect` and listen on events directly too!
    socket.emit('ferret', 'tobi', 'woot', function (data) { // args are sent in order to acknowledgement function
      console.log(data); // data will be 'tobi says woot'
    });
  });

//群组房间消息方法:
//https://socket.io/docs/rooms-and-namespaces/
//Default room //Each Socket in Socket.IO is identified by a random, unguessable, unique identifier Socket#id.
io.on('connection', function (socket) {})
//服务器;指定某个房间
var chat = io
  .of('/chat')//命名空间,群里面;
  .on('connection', function (socket) {
    socket.emit('a message', {
        that: 'only'
      , '/chat': 'will get'
    });
  });
//设备端
var chat = io.connect('http://localhost/chat')
chat.on('connect', function () {
    chat.emit('hi!');
});

//Joining and leaving; join加入房间;leave离开房间;
io.on('connection', function(socket){
  socket.join('some room');
});
io.to('some room').emit('some event');//然后在广播或发射时简单地使用to或in(它们相同):
//断开连接后,socket会自动离开它们所属的所有通道,您无需进行任何特殊拆卸
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

# nginx中兼容设置

location / {
 # enable WebSockets
  proxy_http_version 1.1;
  proxy_set_header Upgrade $http_upgrade;
  proxy_set_header Connection "upgrade";
  
  proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
  proxy_set_header Host $host;
  proxy_pass   http://127.0.0.1:7001;

  # http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_bind
  # proxy_bind       $remote_addr transparent;
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 设备上下线处理

'use strict';
import _ from 'lodash';

function onDisconnect(/*socket*/) {}
function onConnect(socketIo, socket) {
  socket.on('info', data => {
    socket.log(JSON.stringify(data, null, 2));
  });
  require('../api/user/user.socket').register(socketIo, socket);
}

export default function(socketIo) {
  socketIo.on('connection', function(socket) {
    socket.address = `${socket.request.connection.remoteAddress}:${socket.request.connection.remotePort}`;
    socket.connectedAt = new Date();

    socket.log = function(...data) {
      console.log(`SocketIO ${socket.nsp.name} [${socket.address}]`, ...data);
    };

    // Call onDisconnect.
    socket.on('disconnect', () => {
      _.remove(global.socketUsers, {socketId: socket.id});
      socket.broadcast.emit('update-status', global.socketUsers.map(item => {
        return {
          socketId: item.socketId,
          mac: item.mac,
        };
      }));
      socket.broadcast.emit('client.offline', socket.mac);
      onDisconnect(socket);
      socket.log('DISCONNECTED', socket.id, socket.mac);
    });

    // Call onConnect.
    onConnect(socketIo, socket);
    socket.log('CONNECTED', socket.id);
  });
}

 socket.on('login', user => {
    if(user) {
      // global.socketUser[user] = socket;
      // global.socketUser[socket.id] = user;
      Cache.get(`userSocket_${user}`).then(list => {
        if (!list) {
          list = [];
        }
        list.push(socket.id);
        Cache.set(`userSocket_${user}`, list).then(() => {});
      });
      Cache.set(`socketUser_${socket.id}`, user).then(() => {});
    }
  });
  socket.on('disconnect', () => {
    // let _id = global.socketUser[socket.id];
    // if(global.socketUser[_id]) {
    //   delete global.socketUser[_id];
    // }
    Cache.get(`socketUser_${socket.id}`).then(_id => {
      Cache.get(`userSocket_${_id}`).then(list => {
        if (!list) {
          return;
        }
        _.remove(list, socket.id);
        if (list.length === 0) {
          Cache.del(`userSocket_${_id}`);
        } else {
          Cache.set(`userSocket_${_id}`, list).then(() => {});
        }
      });
      Cache.del(`socketUser_${socket.id}`);
    });
  });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

# pm2多进程共享数据

利用 socket.io-redis结合redis处理。不能再使用 global存储;

没有放在gloal中,现在放在redis中,方便pm2多进程部署项目: (gloal跟redis的对比)

负责路由消息的接口就是我们所说的适配器。您可以在socket.io-adapter之上实现自己的继承(通过继承),也可以使用我们在Redis之上提供的实现:socket.io-redis:

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));
1
2
3

集群后带来的主要问题就是异地服务器和多进程间的通讯问题,如果你的应用都是基于单进程颗粒的,则不需要考虑这个问题,如果你的信息在多进程则存在共享和通讯的问题,则集群后要小心处理。

官方建议的方案是将数据(事件)集中存储(可以存储在内存或redis等持久化介质里),然后各服务端从集中存储的数据池里获取数据,其已经实现了一个抽象层 Socket.io-Adapter,这个抽象层使用内存,不建议直接使用,这里有人实现的redis的例子Socket.io-Redis(地址 (opens new window)),坏处是你需要先安装redis。:

上面说的都是socket进程间的通讯,如果你要从socket.io进程发消息给非socket.io进程,如http,则需要另外一个中间件socket.io-emitter(地址 (opens new window))。

实践修改:

import express from 'express';
import mongoose from 'mongoose';
mongoose.Promise = require('bluebird');
import http from 'http';
const redisAdapter = require('socket.io-redis');

var app = express();
var server = http.createServer(app);
let sport = 3131 + (parseInt(process.env.NODE_APP_INSTANCE) || 0);
var socketio = require('socket.io')(sport);
socketio.adapter(redisAdapter({host: config.redis.host, port: config.redis.port}));
Cache.delList('socketUser_*');
Cache.delList('userSocket_*');

global.socketUser = {};
global.centralList = {};
global.io = socketio;

require('./config/socketio').default(socketio);

function onDisconnect(socket) {
  const socketId = socket.id;
  if(global.socketUser[socketId]) {
    delete global.socketUser[socketId];
  }
  Cache.del(`socketUser_${socketId}`);
}

//使用;io.emit('hi', 'all sockets');
//实践:用户扫描二维码登录功能;注意这里生成的二维码手机端要在加载解析&处理,要不然直接获取有微信加的乱码问题;
  socket.on('loginQR', () => {
    let socketId = socket.id;
    let postTokenUrl = `${global.host}#/Mobile/MobileLogin?socketId=${socketId}&b=onyx`;
    QRCode.toDataURL(postTokenUrl)
    .then(url => {
      let cacheData = {
        socketId: socketId,
        imageUrl: url,
      };
      Cache.set(`loginQR_${socketId}`, cacheData, 60 * 10);//十分钟有效
      socket.emit('loginQRUrl', url);
      // global.socketUser[socketId] = socket;
      Cache.set(`socketUser_${socketId}`, socket.id).then(() => {});
    })
    .catch(err => {
      console.error(err);
    });
  });

//微信端登录扫描 ;登录扫描;微信端登录成功后请求,再发送token给web登录;
export function loginWechatScan(req, res) {
  let resData = req.body;
  let token = resData.token;
  let socketId = resData.socketId;
  return Cache.get(`loginQR_${socketId}`).then(data => {
    if(data) {
      if (socketId) {
        global.io.to(socketId).emit('loginWechatScan', token);
        return res.status(200).end();
      } else {
        return res.status(401).json(errorMsg.qr_overdue_error);
      }
    } else {
      return res.status(403).json(errorMsg.qr_overdue_error);
    }
  })
  .catch(handleError(res));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

参考:https://socket.io/docs/using-multiple-nodes/

# 不稳定性处理

由于socket.io相对于avpush推送没有那么稳定;要采取推送模式保证数据可以推送到设备上;

推拉模式并用保证数据推送的准确;

设备端要设置主动拉去数据;(结合bulll对队列推送,有后台ui保证数据准确送达)

// 根据 mac 和 time筛选一个会议
export function getMtgManual(req, res) {  
  const {mac} = req.query;
  let query = {};
  let nowDate = new Date(); //现以服务器时间为标准;
  let startTime = new Date(Date.parse(nowDate) + 24 * 60 * 60 * 1000); //1 * 24 * 60 * 60 * 1000 24小时
  query['$and'] = [{date: {'$gte': nowDate, '$lte': startTime} }];
  if(mac) {
    query['$and'].push({
      'pushUsers.mac': mac
    });
  }
  return Meeting.find(query).sort({ updatedAt: -1 })
    .populate({path: 'topics'})
    .then(res => {
      if(!(res && res.length > 0)) {
        return Promise.resolve('');
      }
      let meeting = res[0];
      meeting = meeting.toObject();
      let topics = meeting.topics.map(item => {
        let {file, book, _id, topicKid, personCode} = item;
        file.url = global.host + file.url;
        file.name = file.filename;
        file.type = 'pdf';
        return {
          _id,
          topicKid,
          file,
          docId: book,
          personCode,
        };
      });
      let user = meeting.pushUsers.find(item => {
        return item.mac === mac;
      });
      if(user.passcode) {
        user.isLead = true;
      } else {
        user.isLead = false;
      }
      let data = {
        module: 'meeting',
        title: meeting.title,
        mtgKid: meeting.mtgKid,
        date: meeting.date,
        location: meeting.location,
        pushUsers: meeting.pushUsers,
        user,
        topics,
      };
      return Promise.resolve(data);
    })
    .then(handleEntityNotFound(res))
    .then(respondWithResult(res))
    .catch(handleError(res));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

# 在eggjs中的使用

eggjs中的socket.io配置: eggjs-emqtt的配置跟这个配置一致;

egg-socket.io (opens new window) 内置了 socket.io-redis,在 cluster 模式下,使用 redis 可以较为简单的实现 clients/rooms 等信息共享

// {app_root}/config/config.${env}.js
exports.io = {
  init: { }, // passed to engine.io
  namespace: {
    '/': {
      connectionMiddleware: [],
      packetMiddleware: [],
    },
    '/example': {
      connectionMiddleware: [],
      packetMiddleware: [],
    },
  },
};

// {app_root}/config/config.${env}.js
exports.io = {
  redis: {
    host: { redis server host },
    port: { redis server port },
    auth_pass: { redis server password },
    db: 0,
  },
};

{
  "scripts": {
    "dev": "egg-bin dev --sticky",
    "start": "egg-scripts start --sticky"
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

开启 redis 后,程序在启动时会尝试连接到 redis 服务器 此处 redis 仅用于存储连接实例信息,参见 #server.adapter (opens new window)

注意: 如果项目中同时使用了 egg-redis, 请单独配置,不可共用。--sticky

# 用global跟redis存储的对比

egg中用global跟redis存储数据(map存储操作)的对比

//app.js初始化
async didLoad () {
    // global.routeList = {}
    // global.deviceList = {}
    require('./lib/sys/udp')(this.app)
  }
  async beforeClose () {
    await this.app.redis.del(this.app.config.redis.constant.routeList)//删除全部
    await this.app.redis.del(this.app.config.redis.constant.deviceList)
  }
//设置(添加+删除)
try {
    const ctx = app.createAnonymousContext()
    const route = await ctx.service.route.findOneAndCreate(data, true)
    await app.redis.hset(app.config.redis.constant.routeList, route.host, JSON.stringify({}))//设置单个
    // global.routeList[route.host] = {}
    await ctx.service.route.sendRoutes()
} catch (error) {
    app.logger.error(error)
}
client.connect(app.config.udp.portTcp, host, async function () {
    console.log('Tcp Connected : ' + host)
    // console.log('Tcp client : ', client)
    await app.redis.hset(app.config.redis.constant.routeList, host, JSON.stringify(client))//设置单个
    // global.routeList[host] = client
    await ctx.service.route.sendRoutes()
    resolve(client)
})
client.on('error', async function (err) {
    console.log(`===== tcp connect ${host} error`)
    await app.redis.hdel(app.config.redis.constant.routeList, host)//删除单个
    // delete global.routeList[host]
    await ctx.service.route.sendRoutes()
    reject(err)
})

//获取(所有 + 单个)
const hostClient = await app.redis.hget(app.config.redis.constant.routeList, host)//获取单个
// const hostClient = global.routeList[host]
if (hostClient) {
    return Promise.resolve(JSON.parse(hostClient))
} else {}
const hosts = await app.redis.hkeys(app.config.redis.constant.routeList)//获取所有
// const hosts = Object.keys(global.routeList)
let routes = await ctx.service.route.findAll()
for (const route of routes) {
    let status = 2
    if (hosts.indexOf(route.host) !== -1) {
        status = 1
    }
    route.status = status
    // if (route.status !== status) {//数据库不做状态存储,默认是离线
    //   route.status = status
    //   await route.save()
    // }
}
routes = _.orderBy(routes, ['status'], ['asc'])
app.io.emit('sRoutes', routes)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

踢用户下线及获取房间(群)在线用户列表

nsp.adapter.remoteDisconnect(id,ture,func)// 服务器主动踢用户下线;客户端触发 disconnect 事件

nsp.adapter.clients(rooms, (err, clients) => {})// 获取在线列表

nsp.to(room).emit('online', {})// 发送房间(群)消息;

const id = socket.id
const nsp = app.io.of('/')
const query = socket.handshake.query
const { room, userId } = query// 用户信息
const rooms = [room]
parseMsg (action, payload = {}, metadata = {}) {
    const meta = Object.assign({}, {
        timestamp: Date.now()
    }, metadata)
    return {meta,data: {action,payload}}
},
const tick = (id, msg) => {
    logger.debug('#tick', id, msg)
    socket.emit(id, helper.parseMsg('deny', msg))//踢出用户前发送消息
    nsp.adapter.remoteDisconnect(id, true, err => {//调用adapter方法踢出用户,客户端触发 disconnect 事件
        logger.error(err)
    })
}
logger.debug('#join', room)
socket.join(room)// 用户加入
nsp.adapter.clients(rooms, (err, clients) => {// 在线列表
    if (err) {console.log(err)}
    logger.debug('#online_join', clients)
    nsp.to(room).emit('online', {// 更新在线用户列表
        clients,
        action: 'join',
        target: 'participator',
        message: `User(${id}) joined.`
    })
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

egg在这方面做的消息格式优化:

上面的,parseMsg方法封装;

# AvPush

推送通知示例: 通过 installationId来推送消息到设备中;每个设备安装后会有一个installationId再提交保存在用户的 avs字段数组下;

  /**
   * @api {post} /notices/sendNotify 推送通知示例
   * @apiDescription 推送通知示例
   * @apiName sendNotify
   * @apiGroup Notice
   *
   * @apiPermission token
   * @apiHeader {String} Authorization jsonwebtoken
   *
   * @apiParam {String} type 类型
   * @apiParam {String} alert 大标题
   * @apiParam {String} title 小标题
   * @apiParam {String} content 消息内容
   */
  async sendNotify () {
    const {
      ctx,
      service
    } = this
    try {
      ctx.validate(this.CreateTransfer)
    } catch (error) {
      ctx.failure(error)
      return
    }
    let user = await service.user.one({
      id: ctx.state.user.id
    })
    const { type, alert, title, content } = ctx.request.body
    const data = {
      type,
      alert,
      title,
      content
    }
    const res = await service.av.sendNotify(user.avs, data, user.isNotice)
    ctx.success(res)
  }

async sendNotify (avs, data = {}, isNotice) {
    const { ctx, config, service } = this
    let res = { data: {} }
    if (isNotice) {
      const { appId, appKey, apiPush } = config.leanCloud
      data.type = data.type || 'default'
      // data.action = 'com.avos.UPDATE_STATUS'
      const notificationId = Date.now() + 'n'
      data.objectId = notificationId
      res = await ctx.curl(apiPush, {
        method: 'POST',
        headers: {
          'content-type': 'application/json;charset=UTF-8',
          'X-LC-Id': appId,
          'X-LC-Key': appKey
        },
        contentType: 'json',
        dataType: 'json',
        data: {
          req_id: Date.now() + 'r',
          notification_id: notificationId,
          where: { '$or': [{ 'installationId': { '$in': avs } }, { 'deviceToken': { '$in': avs } }] },
          data
        }
      })
      if (res.status !== 200) {
        return Promise.reject(res.data.error)
      }
    }
    if (ctx.state.user) {
      data.userId = ctx.state.user.id
    }
    await service.notice.findOneAndUpdate(data)
    return Promise.resolve(res.data)
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

# 携带header或参数

websocket协议在握手阶段借用了HTTP的协议,但是在JavaScript websocketAPI中并没有修改请求头的方法。

var token='dcvuahsdnfajw12kjfasfsdf34'

# send发送参数

var  ws = new WebSocket("ws://" + url + "/webSocketServer");
ws.onopen=function(){
    ws.send(token)
}
1
2
3
4

# 请求地址中带参数

var  ws = new WebSocket("ws://" + url?token + "/webSocketServer");
var  wss = new WebSocket("wss://" + url?token + "/webSocketServer");
1
2

var ws = new WebSocket("ws://url/1/3/9");

前台可以使用@PathParam以及{}去取

url里注意base64格式的数据可能会携带"/"需要进行替换

var ws = new WebSocket("ws://url?userid=1");

前台可以使用session.getURI和session.getQueryString取;

# 基于协议头

Websocket 客户端 API 不允许发送自定义 header ,它们允许您只设置一个 header ,即 Sec-WebSocket-Protocol,即特定于应用程序的子协议(protocol)。您可以使用此 header 来传递不记名 token 。

websocket请求头中可以包含Sec-WebSocket-Protocol这个属性,该属性是一个自定义的子协议。它从客户端发送到服务器并返回从服务器到客户端确认子协议。我们可以利用这个属性添加token。

var  ws = new WebSocket("ws://" + url+ "/webSocketServer",[token]);
1

案例:

var ws = new WebSocket("ws://example.com/path", "protocol");
var ws = new WebSocket("ws://example.com/path", ["protocol1", "protocol2"]);
1
2

The above results in the following headers:

Sec-WebSocket-Protocol: protocol
1

and

Sec-WebSocket-Protocol: protocol1, protocol2
1
 var socket = new WebSocket(newUrl, encodeURIComponent(xToken));
 //Sec-WebSocket-Protocol: admin%23null%232020-01-10%2021%3A00%23
1
2

image-20211110160109990

如果传递了token参数,后端响应的时候,也必须带上这个token响应!否则前端接收不到数据!

而后端的websocket如果在header里携带token呢?这里给出golang 的写法:

var upgrader = websocket.Upgrader{
        Subprotocols: []string{r.Header.Get("Sec-WebSocket-Protocol")},
}        
1
2
3

通过这样设置,前后端就可以携带token愉快的通信了。

# Stomp协议

STOMP 中文为: 面向消息的简单文本协议;

websocket定义了两种传输信息类型:文本信息和二进制信息。类型虽然被确定,但是他们的传输体是没有规定的。所以,需要用一种简单的文本传输类型来规定传输内容,它可以作为通讯中的文本传输协议。

STOMP是基于帧的协议,客户端和服务器使用STOMP帧流通讯

一个STOMP客户端是一个可以以两种模式运行的用户代理,可能是同时运行两种模式。

  • 作为生产者,通过SEND框架将消息发送给服务器的某个服务
  • 作为消费者,通过SUBSCRIBE制定一个目标服务,通过MESSAGE框架,从服务器接收消息。

例如:

COMMAND
header1:value1
header2:value2

Body^@
1
2
3
4
5

注:帧以commnand字符串开始,以EOL结束。其中包括可选回车符(13字节),紧接着是换行符(10字节)。command下面是0个或多个<key>:<value>格式的header条目, 每个条目由EOL结束。一个空白行(即额外EOL)表示header结束和body开始。body连接着NULL字节。本文档中的例子将使用^@代表NULL字节。NULL字节可以选择跟多个EOLs。欲了解更多关于STOMP帧的详细信息,请参阅STOMP1.2协议规范 (opens new window)

# STOMP 1.2 协议

STOMP 1.2 clients 必须设置以下headers:

  • 1.accept-version: clients支持的STOMP的版本号。
  • 2.host:client希望连接的虚拟主机名字

可选择设置以下headers:这个是消息中传递参数;

  • 1.login: 用于在server验证的用户id
  • 2.passcode: 用于在server验证的密码
  • 3.heart-beat: 心跳设置

注:STOMP协议大小写敏感

var headers = {
  login: 'mylogin',
  passcode: 'mypasscode',
  // additional header
  'client-id': 'my-client-id'
};
// const headers = this.get('headers')
//const xToken = window.localStorage.getItem('X-Token')
//const headers = Object.assign({
//  'X-Token': xToken
//}, this.get('headers'));
client.connect(headers, connectCallback);
1
2
3
4
5
6
7
8
9
10
11
12

image-20211110151227326

# 常用Command

  • CONNECT
  • CONNECTED
  • SEND
  • SUBSRIBE
  • UNSUBSRIBE
  • BEGIN
  • COMMIT
  • ABORT
  • ACK
  • NACK
  • DISCONNECT

# CONNECT

STOMP客户端通过初始化一个数据流或者TCP链接发送CONNECT帧到服务端,例如:

CONNECT

accept-version:1.2
host:stomp.test

^@
1
2
3
4
5
6

# CONNECTED

如果服务端接收了链接意图,它回回复一个CONNECTED帧:

CONNECTED

version:1.2

^@
1
2
3
4
5

正常链接后客户端和服务端就可以正常收发信息了。

# SEND

客户端主动发送消息到服务器,例如:

SEND
destination:/queue/a
content-type:text/plain

I am send body
^@
1
2
3
4
5
6

注: 必须包含destination目标地址,如果没有content-type,默认表示传递的二进制.

# SUBSCRIBE

客户端注册给定的目的地,被订阅的目的地收到的任何消息将通过MESSAGE Frame发送给client。 ACK 控制着确认模式。

SUBSCRIBE
id:0
destination:/queue/foo
ack:client

^@
1
2
3
4
5
6

id:一个单连接可以对应多个开放的servers订阅,这个id用来客户端和服务端处理与订阅消息和取消订阅相关的动作。

ack:可用的值有auto, client,client-individual, 默认为auto.

ackauto时,client收到server发来的消息后不需要回复ACK帧.server假定消息发出去后client就已经收到。这种模式下可能导致服务端向客户端发送的消息丢失

ackclient时, 客户端收到服务端信息之后必须回复ACK帧。如果在收到客户端回复的ACK之前连接断开,服务端会认为这个消息没有被处理而改发给其他客户端。客户端回复的ACK会被当做累加的处理。这意味着对信息的确认操作不仅仅是确认了这单个的消息,还确认了这个订阅之前发送的所有消息(即接收到一个确认消息就会把之前的消息一起确认掉,批量操作)。

由于client不能处理某些消息,所以client应该发送NACK帧去告诉server它不能消费这些消息。

ack模式是client-individual,确认操作就跟client模式一样,除了ACKNACK不是累加的。这意味着当后来的一个消息得到ACKNACK之后,之前的那个消息没有被ACKNACK,它需要单独的确认。

# UNSUBSRIBE

UNSUBSCRIBE用来移除一个已经存在订阅,一旦一个订阅被从连接中取消,那么客户端就再也不会收到来自这个订阅的消息。

UNSUBSCRIBE

id:0

^@
1
2
3
4
5

由于一个连接可以添加多个服务端的订阅,所以id头是UNSUBSCRIBE必须包含的,用来唯一标示要取消的是哪一个订阅。id的值必须是一个已经存在的订阅的标识。

# ACK

ACK是用来在clientclient-individual模式下确认已经收到一个订阅消息的操作。在上述模式下任何订阅消息都被认为是没有被处理的,除非客户端通过回复ACK确认。

ACK

id:12345

transaction:tx1

^@
1
2
3
4
5
6
7

ACK中必须包含一个id头,头域内容来自对应的需要确认的MESSAGE的ack头。可以选择的指定一个transaction头,标示这个消息确认动作是这个事务内容的一部分。

# NACK

NACK是ACK的反向,它告诉服务端客户端没有处理该消息。服务端可以选择性的处理该消息,重新发送到另一个客户端或者丢弃它或者把他放到无效消息队列中记录。

NACK包含和ACK相同的头信息:id(必须)和transaction(非必须)。

# BEGIN

BEGIN用于开启一个事务-transaction。这种情况下的事务适用于发送消息和确认已经收到的消息。在一个事务期间,任何发送和确认的动作都会被当做事务的一个原子操作。

BEGIN

transaction:tx1

^@
1
2
3
4
5

帧中transaction头是必须的,并且transaction的标示会被用在SENDCOMMITABORTACKNACK中,使之与该事务绑定。同一个链接中的不同事务必须使用不同的标示。 当客户端发送一个DISCONNECT或者TCP链接由于任何原因断开时,任何打开的但是还没有被提交的事务都会被默认的立即中断。

# COMMIT

用来提交一个事务到处理队列中,帧中的transaction头是必须得,用以标示是哪个事务被提交。

COMMIT

transaction:tx1

^@
1
2
3
4
5

# ABORT

ABORT用于中止正在执行的事务,帧中的transaction头是必须得,用以标示是哪个事务被终止。

ABORT

transaction:tx1

^@
1
2
3
4
5

# DISCONNECT

客户端可以通过DISCONNECT帧表示正常断开链接

# MESSAGE

MESSAGE用于传输从服务端订阅的消息到客户端。

MESSAGE中必须包含destionation头,用以表示这个消息应该发送的目标。如果这个消息被使用STOMP发送,那么这个destionation应该与相应的SEND帧中的目标一样。

MESSAGE中必须包含message-id头,用来唯一表示发送的是哪一个消息,以及subscription头用来表示接受这个消息的订阅的唯一标示。

如果收到的订阅消息明确表示需要确认,那么MESSAGE中应该包含一个任意值的ack头,这个值被用来在回复确认时标示这条信息。

MESSAGE如果有body内容,则必须包含content-lengthcontent-type头。

MESSAGE

content-length:100

content-type:text/plain

destination:/queue/a

message-id:007

subscription:0

Hello queue a

^@
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# ERROR

如果连接过程中出现什么错误,服务端就会发送ERROR。在这种情况下,服务端发出ERROR之后必须马上断开连接。

# RECEIPT

每当服务端收到来自客户端的需要receipt的帧时发送给客户端

# 结合webScocket和Stomp协议

在正文的实时消息模块就结合了webScocket和Stomp。**基于websocket的一层STOMP封装,让业务端只需关心数据本身,不需要太过关心文本协议。**当然还是需要了解一些STOMP协议各个Frame的概念和应用场景。

  connect: function (useSockJS) {
    console.debug('Websocket === Start Connection');
    this.disconnect(); // 先断掉之前的连接的
    const dfd = $.Deferred();
    const socket = this.getSocket(useSockJS);
    const client = Stomp.over(socket);
    // TODO:处理token头部
    const headers = this.get('headers');
    client.connect(headers, () => {
      this.onConnectionSuccess();
      if (!Conf.isPro) client.debug = Em.K;
      this.set('client', client);
      dfd.resolve();
    }, (error) => {
      dfd.reject(this.onConnectionError(error));
    });
    return dfd.promise();
  },

  getSocket: function () {
    var wsApi = window.location.protocol === 'https:' ? 'wss:' : 'ws:' + window.location.host;
    var newUrl = this.get('webSocketUrl2').replace('{wsApi}', wsApi)
    var socket
    if ('WebSocket' in window) {
      socket = new WebSocket(newUrl);
    } else if ('MozWebSocket' in window) {
      socket = new window.MozWebSocket(newUrl);
    } else {
      socket = new window.SockJS(newUrl);
    }
   return socket
  },
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 调试工具

# 最新版本postman调试

# 使用教程 (opens new window)

# 参考链接

https://socket.io/docs/

https://eggjs.org/zh-cn/tutorials/socketio.html

https://www.cnblogs.com/goloving/p/10746378.html

WebSocket和Stomp协议 (opens new window)

https://stackoverflow.com/questions/4361173/http-headers-in-websockets-client-api

上次更新: 2022/04/15, 05:41:29
×