From b73c3e89dd5ffcfaa3e38fbd30fc95b07caab58e Mon Sep 17 00:00:00 2001 From: liuxiaobo <1224730913@qq.com> Date: Tue, 17 Jun 2025 18:24:33 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E8=AF=95=E6=B8=85=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- etcd/etcd.go | 23 ++++++++++++-- etcd/etcdImpl.go | 1 - service/natsService.go | 9 +++--- ws/userMgr.go | 22 ++++++++----- ws/wsConn.go | 12 +++++-- ws/wsConnMgr.go | 72 ++++++++++++++++++++++++++++++++++++++++++ ws/wsMgr.go | 67 --------------------------------------- ws/wsServer.go | 27 +++++++++++----- 8 files changed, 139 insertions(+), 94 deletions(-) create mode 100644 ws/wsConnMgr.go delete mode 100644 ws/wsMgr.go diff --git a/etcd/etcd.go b/etcd/etcd.go index 1f42ae3..54af1e5 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -20,8 +20,9 @@ func NewRegistry[T INode](endpoints []string, username, password string, me T) ( } e := &Registry[T]{} - e.etcdRegistryImpl, err = newServiceRegistryImpl(endpoints, me.EtcdRootKey(), username, password, e, me.EtcdKey(), string(bs)) e.nodes = &sync.Map{} + e.etcdRegistryImpl, err = newServiceRegistryImpl(endpoints, me.EtcdRootKey(), username, password, e, me.EtcdKey(), string(bs)) + return e, err } @@ -36,12 +37,28 @@ func (r *Registry[T]) saveNode(newNodes *sync.Map, jsonBytes []byte) { // 保存当前服务 func (r *Registry[T]) replace(newNodes *sync.Map) { + //log.DebugF("更新前.nodes ptr:%d", &r.nodes) r.nodes = newNodes + //r.nodes.Range(func(key, value interface{}) bool { + // bV, _ := json.Marshal(value) + // log.DebugF("has node key:%s value:%s", key, string(bV)) + // return true + //}) + //log.DebugF("更新所有节点信息.nodes ptr:%d", &r.nodes) } // 获取当前根节点下所有节点信息 -func (r *Registry[T]) GetNodes() *sync.Map { - return r.nodes +func (r *Registry[T]) RangeNode(cb func(key string, value *T) bool) { + //log.DebugF("循环处理.nodes ptr:%d", &r.nodes) + r.nodes.Range(func(key, value interface{}) bool { + k, _ := key.(string) + if node, ok := value.(T); ok { + return cb(k, &node) + } else { + //log.DebugF("转换失败.key:%v", key) + } + return true + }) } // 根据inode的mapKey()查找对应的节点 diff --git a/etcd/etcdImpl.go b/etcd/etcdImpl.go index 615dc7e..c97a618 100644 --- a/etcd/etcdImpl.go +++ b/etcd/etcdImpl.go @@ -159,7 +159,6 @@ func (sr *etcdRegistryImpl) discoverServices() error { newNodes := &sync.Map{} for _, kv := range resp.Kvs { sr.nodeOperator.saveNode(newNodes, kv.Value) - //log.Debug(fmt.Sprintf("save node key:%s value:%s", string(kv.Key), string(kv.Value))) } sr.nodeOperator.replace(newNodes) diff --git a/service/natsService.go b/service/natsService.go index a12c92e..95f31af 100644 --- a/service/natsService.go +++ b/service/natsService.go @@ -11,7 +11,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/nats-io/nats.go" "os" - "sync" "time" ) @@ -175,10 +174,10 @@ func (s *NatsService) ServiceEtcd() *etcd.Registry[etcd.ServiceNode] { return s.registry } -// 从etcd中获取所有服务节点 -func (s *NatsService) GetServiceNodes() *sync.Map { - return s.registry.GetNodes() -} +//// 从etcd中获取所有服务节点 +//func (s *NatsService) GetServiceNodes() *sync.Map { +// return s.registry.GetNodes() +//} // 查找指定的服务节点信息 func (s *NatsService) FindServiceNode(serviceName string) (etcd.ServiceNode, error) { diff --git a/ws/userMgr.go b/ws/userMgr.go index 322778f..9f683f8 100644 --- a/ws/userMgr.go +++ b/ws/userMgr.go @@ -4,21 +4,26 @@ import ( "sync" ) -var userMgr = newUserManager() +//var userMgr = newUserManager() type userManager struct { - users sync.Map // map[int64]uint32 + users sync.Map // map[int64]uint32 + connMgr *connManager } -func newUserManager() *userManager { - return &userManager{} +func newUserManager(connMgr *connManager) *userManager { + return &userManager{ + connMgr: connMgr, + } } func (m *userManager) Add(connId uint32, userId int64) bool { - if userId < 1 { - return false - } - if conn, ok := wsMgr.Get(connId); ok { + if conn, ok := m.connMgr.Get(connId); ok { + if conn.UserId() == 0 { + //log.DebugF("添加玩家:%v 连接:%v", userId, connId) + } else { + //log.DebugF("将连接:%v里的玩家id:%v设置为:%v", connId, conn.UserId(), userId) + } conn.setUserId(userId) m.users.Store(userId, connId) return true @@ -42,5 +47,6 @@ func (m *userManager) Remove(userId int64) { if userId < 1 { return } + //log.DebugF("删除玩家:%v", userId) m.users.Delete(userId) } diff --git a/ws/wsConn.go b/ws/wsConn.go index e9173ce..21e5477 100644 --- a/ws/wsConn.go +++ b/ws/wsConn.go @@ -32,9 +32,10 @@ type wsConnect struct { userId int64 onDisconnect func(IConn) once sync.Once + connMgr *connManager } -func newWsConnect(wsConn *websocket.Conn, onDisconnect func(IConn)) *wsConnect { +func newWsConnect(wsConn *websocket.Conn, onDisconnect func(IConn), connMgr *connManager) *wsConnect { c := &wsConnect{ wsConn: wsConn, inChan: safeChan.NewSafeChan[*wsMessage](1000), @@ -42,17 +43,24 @@ func newWsConnect(wsConn *websocket.Conn, onDisconnect func(IConn)) *wsConnect { id: nextConnId, userId: 0, onDisconnect: onDisconnect, + connMgr: connMgr, } return c } // 把消息放进写队列 func (c *wsConnect) SendMsg(data []byte) error { + if c == nil { + return fmt.Errorf("wsConnect is nil") + } return c.outChan.Write(&wsMessage{messageType: wsMsgType, data: data}) } // 关闭链接 func (c *wsConnect) Close() { + if c == nil { + return + } c.once.Do(func() { //log.Debug(c.Log("关闭链接")) c.inChan.Close() @@ -61,7 +69,7 @@ func (c *wsConnect) Close() { if c.onDisconnect != nil { c.onDisconnect(c) } - wsMgr.Remove(c) + c.connMgr.Remove(c) }) } diff --git a/ws/wsConnMgr.go b/ws/wsConnMgr.go new file mode 100644 index 0000000..59ee744 --- /dev/null +++ b/ws/wsConnMgr.go @@ -0,0 +1,72 @@ +package ws + +import ( + "sync" +) + +//var connMgr = newConnManager() + +type connManager struct { + wsConnAll sync.Map // cmap.ConcurrentMap[uint32, *wsConnect] + userMgr *userManager +} + +// 连接管理器 +func newConnManager(userMgr *userManager) *connManager { + return &connManager{ + //wsConnAll: cmap.NewWithCustomShardingFunction[uint32, *wsConnect](func(key uint32) uint32 { + // return key + //}), + userMgr: userMgr, + } +} + +func (m *connManager) Add(conn *wsConnect) { + //log.DebugF("添加连接:%v 玩家:%v", conn.Id(), conn.UserId()) + m.wsConnAll.Store(conn.id, conn) +} + +func (m *connManager) SetUserId(connId uint32, userId int64) { + m.userMgr.Add(connId, userId) +} + +func (m *connManager) Remove(conn *wsConnect) { + if conn.UserId() > 0 { + m.userMgr.Remove(conn.UserId()) + } + //log.DebugF("删除连接:%v 玩家:%v", conn.Id(), conn.UserId()) + m.wsConnAll.Delete(conn.id) +} + +func (m *connManager) Get(connId uint32) (*wsConnect, bool) { + v, ok := m.wsConnAll.Load(connId) + if ok { + conn, ok := v.(*wsConnect) + return conn, ok + } + return nil, false +} + +func (m *connManager) FindByUserId(userId int64) (*wsConnect, bool) { + connId := m.userMgr.GetConnId(userId) + return m.Get(connId) +} + +func (m *connManager) Rang(cb func(conn IConn) bool) { + m.userMgr.Rang(func(_, v any) bool { + connId := v.(uint32) + if conn, ok := m.Get(connId); ok { + return cb(conn) + } + return true + }) +} + +func (m *connManager) Count() int { + count := 0 + m.wsConnAll.Range(func(k, v interface{}) bool { + count++ + return true + }) + return count +} diff --git a/ws/wsMgr.go b/ws/wsMgr.go deleted file mode 100644 index 046e8e8..0000000 --- a/ws/wsMgr.go +++ /dev/null @@ -1,67 +0,0 @@ -package ws - -import ( - "sync" -) - -var wsMgr = newManager() - -type wsManager struct { - wsConnAll sync.Map // cmap.ConcurrentMap[uint32, *wsConnect] -} - -func newManager() *wsManager { - return &wsManager{ - //wsConnAll: cmap.NewWithCustomShardingFunction[uint32, *wsConnect](func(key uint32) uint32 { - // return key - //}), - } -} - -func (m *wsManager) Add(conn *wsConnect) { - m.wsConnAll.Store(conn.id, conn) -} - -func (m *wsManager) SetUserId(connId uint32, userId int64) { - userMgr.Add(connId, userId) -} - -func (m *wsManager) Remove(conn *wsConnect) { - if conn.UserId() > 0 { - userMgr.Remove(conn.UserId()) - } - m.wsConnAll.Delete(conn.id) -} - -func (m *wsManager) Get(connId uint32) (*wsConnect, bool) { - v, ok := m.wsConnAll.Load(connId) - if ok { - conn, ok := v.(*wsConnect) - return conn, ok - } - return nil, false -} - -func (m *wsManager) FindByUserId(userId int64) (*wsConnect, bool) { - connId := userMgr.GetConnId(userId) - return m.Get(connId) -} - -func (m *wsManager) Rang(cb func(conn IConn) bool) { - userMgr.Rang(func(_, v any) bool { - connId := v.(uint32) - if conn, ok := m.Get(connId); ok { - return cb(conn) - } - return true - }) -} - -func (m *wsManager) Count() int { - count := 0 - m.wsConnAll.Range(func(k, v interface{}) bool { - count++ - return true - }) - return count -} diff --git a/ws/wsServer.go b/ws/wsServer.go index 34dc36b..3b47d1e 100644 --- a/ws/wsServer.go +++ b/ws/wsServer.go @@ -40,10 +40,21 @@ type WsServer struct { addr string // 0.0.0.0:8888 onMessage func(IConn, []byte) onDisconnect func(IConn) + connMgr *connManager + userMgr *userManager } func NewWsServer(addr string, onMessage func(IConn, []byte), onDisconnect func(IConn)) *WsServer { - return &WsServer{addr: addr, onMessage: onMessage, onDisconnect: onDisconnect} + wss := &WsServer{ + addr: addr, + onMessage: onMessage, + onDisconnect: onDisconnect, + connMgr: newConnManager(nil), + userMgr: newUserManager(nil), + } + wss.connMgr.userMgr = wss.userMgr + wss.userMgr.connMgr = wss.connMgr + return wss } func (s *WsServer) wsHandle(w http.ResponseWriter, r *http.Request) { @@ -54,9 +65,9 @@ func (s *WsServer) wsHandle(w http.ResponseWriter, r *http.Request) { } // defer func() { _ = conn.close() }() nextConnId++ - wsConn := newWsConnect(conn, s.onDisconnect) - wsMgr.Add(wsConn) - log.DebugF("当前连接数:%v", wsMgr.Count()) + wsConn := newWsConnect(conn, s.onDisconnect, s.connMgr) + s.connMgr.Add(wsConn) + log.DebugF("当前连接数:%v", s.connMgr.Count()) log.DebugF("新连接id:%v %v", wsConn.Id(), wsConn.Name()) ksync.GoSafe(func() { wsConn.handle(s.onMessage) }, nil) ksync.GoSafe(wsConn.readWsLoop, nil) @@ -89,17 +100,17 @@ func (s *WsServer) Run() { } func (s *WsServer) SetUserId(connId uint32, userId int64) { - wsMgr.SetUserId(connId, userId) + s.connMgr.SetUserId(connId, userId) } func (s *WsServer) FindConnByUserId(userId int64) (IConn, bool) { - return wsMgr.FindByUserId(userId) + return s.connMgr.FindByUserId(userId) } func (s *WsServer) FindConnByConnId(connId uint32) (IConn, bool) { - return wsMgr.Get(connId) + return s.connMgr.Get(connId) } func (s *WsServer) Rang(cb func(conn IConn) bool) { - wsMgr.Rang(cb) + s.connMgr.Rang(cb) }