最近收到一个需求,需要做一套消息中心,需求倒是很简单,再用户有新消息时推送给web页面,提示用户有新消息未读,最初版本用短轮询方案实现,若是局部组件,可关闭页面后结束轮询,但是此消息中心是一个全局组件,只要开启页面就开始轮询,体验不佳,看着network密密麻麻的请求头都大了,随使用SSE方案。
        
          对于这种消息推送目前几个成熟方案:
      
- 客户端轮询 (短轮询)
- 服务端轮询 (长轮询)
- WebSocket
- SSE (Serve-Send Events)
         
      
        
          各方案间区别:
      
| * | 客户端轮询 | 服务端轮询 | WebSocket | SSE | 
| 协议 | http | http | tcp | http | 
| 优点 | 实现方便,兼容性好 | 同短轮询,但比短轮询节约资源,相对短轮询请求次数少 | 双全工通信协议,性能开销相对较小,可双向通信 | H5规范的一部分,无需安装直接使用;资源占用小;前端部分实现极其简单 | 
| 缺点 | 占用较多内存和请求数;污染network列表 | 同短轮询 | 开发成本高;相对sse资源开销大 | 单向推送;兼容性问题;只能get请求,且请求头无法加内容(或者使用第三方封装sse插件) | 
          SSE兼容问题:
      
         
      
        
          SSE实现
      
        
          1. 前端实现 + visibilitychange性能优化
      经过测试环境几天的运行发现了新的问题,当用户挂机时,js代码也在正常的跑,如果用户忘记关闭页面,且电脑从不关机,就会导致页面请求每隔一小时发送一次,实际绝大部分挂机时间不需要维持此链接,那么我们可以通过visibilitychange事件优化我们的消息通知
         
      
通过mdn文档可知,我们可以通过visible和hidden去优化我们的代码
以下代码为最新代码,带浏览器降级处理
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 
 | const RETRY_TIME = 15 * 1000
 
 mounted() {
 
 this.initNotify()
 
 this.initListener()
 },
 
 methods: {
 
 initNotify() {
 if ('EventSource' in window) {
 
 this.getUnReadNoticeCount()
 
 this.registerEvent()
 } else {
 
 clearTimeout(this.timer)
 this.loopFetch()
 }
 },
 
 initListener() {
 document.addEventListener('visibilitychange', this.listeningChange)
 },
 
 listeningChange() {
 if (document.visibilityState === 'hidden') {
 
 this.clearAllTimer()
 } else if (this.eventSource) {
 if (this.eventSource.readyState === 2) {
 
 this.registerEvent()
 }
 } else {
 
 this.loopFetch()
 }
 },
 
 registerEvent() {
 if (document.visibilityState === 'visible') {
 const now = new Date().getTime()
 if (!this.sourceTime || (now - this.sourceTime) >= RETRY_TIME) {
 this.sourceTime = new Date().getTime()
 const user = getUserInfo()
 const userId = user ? JSON.parse(user).userId : ''
 
 this.eventSource = new EventSource(notifyNumUrl(userId))
 console.log('eventSource: ', this.eventSource);
 
 this.eventSource.onopen = () => {
 clearTimeout(this.errorTimer)
 }
 
 this.eventSource.onmessage = ({ data }) => {
 if (data) {
 this.noticeNum = data
 }
 }
 
 this.eventSource.onerror = (error) => {
 if (error.currentTarget.readyState === 2) {
 this.eventSource.close && this.eventSource.close()
 this.registerEvent()
 }
 }
 } else {
 this.errorTimer = setTimeout(() => {
 this.registerEvent()
 }, RETRY_TIME)
 }
 }
 },
 
 loopFetch() {
 this.getUnReadNoticeCount()
 this.timer = setTimeout(() => {
 this.loopFetch()
 }, 1000 * 30)
 },
 
 getUnReadNoticeCount() {
 
 },
 
 clearAllTimer() {
 if (this.eventSource) {
 clearTimeout(this.errorTimer)
 } else {
 clearTimeout(this.timer)
 }
 },
 },
 
 
 beforeDestroy() {
 document.removeEventListener('visibilitychange', this.listeningChange)
 this.clearAllTimer()
 this.eventSource && this.eventSource.close && this.eventSource.close()
 },
 
 | 
          2. 后端实现 JAVA Spring Web MVC
      | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 
 | @RestController@RequestMapping("/sse")
 @Api(tags = "Sse消息通知")
 @Slf4j
 public class SseController extends BaseController {
 @Resource
 private SseService sseService;
 
 
 
 
 
 @GetMapping("/create")
 public Object createSession(String userId) throws IOException {
 return sseService.createSession(userId);
 }
 
 
 
 
 
 @GetMapping("/close")
 public void closeSession(String userId) throws IOException {
 sseService.closeSession(userId);
 }
 }
 
 @DubboService
 @Service
 @Slf4j
 public class SseServiceImpl implements SseService {
 
 public final static Map<String, SseEmitter> SSE_CACHE= new ConcurrentHashMap<>();
 
 public final static Map<String, Integer> SEND_RECORD= new ConcurrentHashMap<>();
 
 
 
 
 
 public synchronized SseEmitter createSession(String clientId) throws IOException{
 
 SEND_RECORD.remove(clientId);
 
 SseEmitter sseEmitter = new SseEmitter(0L);
 SSE_CACHE.put(clientId,sseEmitter);
 log.info("客户端:{}  新建连接成功,当前客户端总数为【{}】",clientId,SSE_CACHE.size() );
 return sseEmitter;
 }
 
 @Override
 public void closeSession(String clientId) {
 if (SSE_CACHE.containsKey(clientId)){
 SSE_CACHE.get(clientId).complete();
 SSE_CACHE.remove(clientId);
 SEND_RECORD.remove(clientId);
 log.info("客户端:【{}】 断开成功,当前剩余客户端总数为【{}】",clientId,SSE_CACHE.size());
 }
 }
 }
 
 
 
 
 @XxlJob("sendUnRead")
 public void sendUnRead(){
 if (SseServiceImpl.SSE_CACHE.size()>0){
 List<UnReadNoticePO> unReadNoticePOS = noticeService.countAllUnReadNotice(new ArrayList<>(SseServiceImpl.SSE_CACHE.keySet()));
 log.info("stringIntegerMap",unReadNoticePOS);
 if(CollectionUtil.isNotEmpty(unReadNoticePOS)){
 
 for (Map.Entry<String, SseEmitter> entry : SseServiceImpl.SSE_CACHE.entrySet()) {
 String key = entry.getKey();
 Optional<UnReadNoticePO> first = unReadNoticePOS.stream().filter(u -> u.getUserId().equals(key)).findFirst();
 
 if(first.isPresent()){
 log.info("first",first.get());
 SseEmitter sseEmitter = SseServiceImpl.SSE_CACHE.get(key);
 try {
 Integer lastNum = SseServiceImpl.SEND_RECORD.get(first.get().getUserId());
 if(ObjectUtil.isEmpty(lastNum) || lastNum!=first.get().getNum()){
 sseEmitter.send(SseEmitter.event().reconnectTime(1000).id(entry.getKey()).data(first.get().getNum()));
 log.info(entry.getKey()+"发送消息成功,内容:{}",first.get().getNum());
 }
 SseServiceImpl.SEND_RECORD.put(first.get().getUserId(),first.get().getNum());
 }catch (IOException e){
 
 SseServiceImpl.SSE_CACHE.remove(entry.getKey());
 log.error(entry.getKey()+"消息发送失败,通道已关闭!",e);
 
 }
 }
 }
 }
 }
 }
 
 | 
          3. 运维
      运维需要配合修改nginx配置的proxy-read-timeout超时时间,目前我们方案超时时间一小时。
        
          成品展示
      此时只保留一条http请求,后端轮询到新消息就推送到前端,若前端页面不显示或被隐藏(锁屏切标签等)则不去请求,若页面显示则自动恢复请求。方便快捷,体验更好
        