Commit abd4d84d authored by 胡文斌's avatar 胡文斌

修改消息的Websocket

parent 968f63ce
package com.archser.aserver.websocket; package com.archser.aserver.websocket;
import java.io.IOException;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.apache.log4j.Logger;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.archser.aserver.interceptor.JwtSigningKeyResolver; import com.archser.aserver.interceptor.JwtSigningKeyResolver;
import com.archser.aserver.service.MessageService; import com.archser.aserver.service.MessageService;
import com.google.gson.JsonParser;
import com.jfinal.aop.Aop; import com.jfinal.aop.Aop;
import com.jfinal.json.FastJson; import com.jfinal.json.FastJson;
import com.jfinal.kit.StrKit; import com.jfinal.kit.StrKit;
import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.Page; import com.jfinal.plugin.activerecord.Page;
import com.jfinal.plugin.activerecord.Record; import com.jfinal.plugin.activerecord.Record;
import io.jsonwebtoken.Claims; import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwts; import io.jsonwebtoken.Jwts;
import io.undertow.websockets.jsr.UndertowSession; import org.apache.log4j.Logger;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ServerEndpoint(value = "/message.ws/{params}", configurator = MessageWebSocketConfig.class) @ServerEndpoint(value = "/message.ws/{params}", configurator = MessageWebSocketConfig.class)
public class MessageWebSocket { public class MessageWebSocket {
...@@ -43,7 +32,7 @@ public class MessageWebSocket { ...@@ -43,7 +32,7 @@ public class MessageWebSocket {
// 用来存放每个客户端对应的MessageWebSocket对象,适用于同时与多个客户端通信 // 用来存放每个客户端对应的MessageWebSocket对象,适用于同时与多个客户端通信
// public static final CopyOnWriteArraySet<MessageWebSocket> webSocketSet = new CopyOnWriteArraySet<MessageWebSocket>(); // public static final CopyOnWriteArraySet<MessageWebSocket> webSocketSet = new CopyOnWriteArraySet<MessageWebSocket>();
// 若要实现服务端与指定客户端通信的话,可以使用Map来存放,其中Key可以为用户标识 // 若要实现服务端与指定客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
public static final ConcurrentHashMap<UndertowSession, MessageWebSocket> webSocketMap = new ConcurrentHashMap<UndertowSession, MessageWebSocket>(); public static final ConcurrentHashMap<Session, MessageWebSocket> webSocketMap = new ConcurrentHashMap<Session, MessageWebSocket>();
public static final String HEART_TEST_MSG = "keep-alive"; // 心跳测试接受到消息的字符串 public static final String HEART_TEST_MSG = "keep-alive"; // 心跳测试接受到消息的字符串
...@@ -60,7 +49,7 @@ public class MessageWebSocket { ...@@ -60,7 +49,7 @@ public class MessageWebSocket {
// 与某个客户端的连接会话,通过它实现定向推送(只推送给某个用户) // 与某个客户端的连接会话,通过它实现定向推送(只推送给某个用户)
private UndertowSession session; private Session session;
MessageService messageService; MessageService messageService;
...@@ -117,7 +106,7 @@ public class MessageWebSocket { ...@@ -117,7 +106,7 @@ public class MessageWebSocket {
this.initParam(config); this.initParam(config);
this.messageService = Aop.get(MessageService.class); this.messageService = Aop.get(MessageService.class);
// session.setMaxIdleTimeout(this.timeout); // session.setMaxIdleTimeout(this.timeout);
this.session = (UndertowSession) session; this.session = session;
this.receiveMeg = true; this.receiveMeg = true;
webSocketMap.put(this.session, this); // 加入map中 webSocketMap.put(this.session, this); // 加入map中
this.params = params; this.params = params;
...@@ -422,12 +411,12 @@ public class MessageWebSocket { ...@@ -422,12 +411,12 @@ public class MessageWebSocket {
this.session.getBasicRemote().sendText(message); this.session.getBasicRemote().sendText(message);
} }
public UndertowSession getSession() { public Session getSession() {
return this.session; return this.session;
} }
// 定向发送信息 // 定向发送信息
private void sendMessage(UndertowSession mySession, String message) { private void sendMessage(Session mySession, String message) {
synchronized (this) { synchronized (this) {
try { try {
if (mySession.isOpen() && !localClose) {// 该session如果已被删除,则不执行发送请求,防止报错 if (mySession.isOpen() && !localClose) {// 该session如果已被删除,则不执行发送请求,防止报错
......
...@@ -13,9 +13,8 @@ import com.archser.aserver.util.CollectionUtil; ...@@ -13,9 +13,8 @@ import com.archser.aserver.util.CollectionUtil;
import com.jfinal.kit.Prop; import com.jfinal.kit.Prop;
import com.jfinal.kit.PropKit; import com.jfinal.kit.PropKit;
import io.undertow.websockets.jsr.ServerEndpointConfigImpl;
public class MessageWebSocketConfig extends ServerEndpointConfigImpl.Configurator{ public class MessageWebSocketConfig extends ServerEndpointConfig.Configurator{
/** /**
* 定时发送消息周期单位毫秒 * 定时发送消息周期单位毫秒
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment