diff --git a/service/natsService.go b/service/natsService.go index 67e5de0..b08959c 100644 --- a/service/natsService.go +++ b/service/natsService.go @@ -107,6 +107,10 @@ func (s *NatsService) Call(topic string, timeout time.Duration, msg []byte) ([]b return nil, nil } +func (s *NatsService) ServiceEtcd() *etcd.Registry[etcd.ServiceNode] { + return s.registry +} + // 从etcd中获取所有服务节点 func (s *NatsService) GetServiceNodes() *sync.Map { return s.registry.GetNodes() diff --git a/ws/userMgr.go b/ws/userMgr.go index 5697746..96e693d 100644 --- a/ws/userMgr.go +++ b/ws/userMgr.go @@ -1,18 +1,20 @@ package ws -import cmap "github.com/orcaman/concurrent-map/v2" +import ( + "sync" +) var userMgr = newUserManager() type userManager struct { - users cmap.ConcurrentMap[int64, uint32] + users sync.Map // cmap.ConcurrentMap[int64, uint32] } func newUserManager() *userManager { return &userManager{ - users: cmap.NewWithCustomShardingFunction[int64, uint32](func(key int64) uint32 { - return uint32(key) - }), + //users: cmap.NewWithCustomShardingFunction[int64, uint32](func(key int64) uint32 { + // return uint32(key) + //}), } } @@ -22,20 +24,22 @@ func (m *userManager) Add(connId uint32, userId int64) bool { } if conn, ok := wsMgr.Get(connId); ok { conn.setUserId(userId) - m.users.Set(userId, connId) + m.users.Store(userId, connId) return true } return false } func (m *userManager) GetConnId(userId int64) uint32 { - connId, _ := m.users.Get(userId) - return connId + if connId, ok := m.users.Load(userId); ok { + return connId.(uint32) + } + return 0 } func (m *userManager) Remove(userId int64) { if userId < 1 { return } - m.users.Remove(userId) + m.users.Delete(userId) } diff --git a/ws/wsMgr.go b/ws/wsMgr.go index 36b737c..b2ebb45 100644 --- a/ws/wsMgr.go +++ b/ws/wsMgr.go @@ -1,23 +1,25 @@ package ws -import cmap "github.com/orcaman/concurrent-map/v2" +import ( + "sync" +) var wsMgr = newManager() type wsManager struct { - wsConnAll cmap.ConcurrentMap[uint32, *wsConnect] + wsConnAll sync.Map // cmap.ConcurrentMap[uint32, *wsConnect] } func newManager() *wsManager { return &wsManager{ - wsConnAll: cmap.NewWithCustomShardingFunction[uint32, *wsConnect](func(key uint32) uint32 { - return key - }), + //wsConnAll: cmap.NewWithCustomShardingFunction[uint32, *wsConnect](func(key uint32) uint32 { + // return key + //}), } } func (m *wsManager) Add(conn *wsConnect) { - m.wsConnAll.Set(conn.id, conn) + m.wsConnAll.Store(conn.id, conn) } func (m *wsManager) SetUserId(connId uint32, userId int64) { @@ -28,18 +30,32 @@ func (m *wsManager) Remove(conn *wsConnect) { if conn.UserId() > 0 { userMgr.Remove(conn.UserId()) } - m.wsConnAll.Remove(conn.id) + m.wsConnAll.Delete(conn.id) } func (m *wsManager) Get(connId uint32) (*wsConnect, bool) { - return m.wsConnAll.Get(connId) + v, ok := m.wsConnAll.Load(connId) + if ok { + conn, ok := v.(*wsConnect) + return conn, ok + } + return nil, false } func (m *wsManager) FindByUserId(userId int64) (*wsConnect, bool) { connId := userMgr.GetConnId(userId) - return m.wsConnAll.Get(connId) + return m.Get(connId) +} + +func (m *wsManager) Rang(cb func(k, v any) bool) { + m.wsConnAll.Range(cb) } func (m *wsManager) Count() int { - return m.wsConnAll.Count() + count := 0 + m.wsConnAll.Range(func(k, v interface{}) bool { + count++ + return true + }) + return count } diff --git a/ws/wsServer.go b/ws/wsServer.go index 69bce45..7afa4be 100644 --- a/ws/wsServer.go +++ b/ws/wsServer.go @@ -55,7 +55,7 @@ func (s *WsServer) wsHandle(w http.ResponseWriter, r *http.Request) { nextConnId++ wsConn := newWsConnect(conn, s.onDisconnect) wsMgr.Add(wsConn) - log.DebugF("当前在线人数:%v", wsMgr.Count()) + log.DebugF("当前连接数:%v", wsMgr.Count()) ksync.GoSafe(func() { wsConn.handle(s.onMessage) }, nil) ksync.GoSafe(wsConn.readWsLoop, nil) ksync.GoSafe(wsConn.writeWsLoop, nil) @@ -79,3 +79,7 @@ func (s *WsServer) SetUserId(connId uint32, userId int64) { func (s *WsServer) FindConnByUserId(userId int64) (IConn, bool) { return wsMgr.FindByUserId(userId) } + +func (s *WsServer) FindConnByConnId(connId uint32) (IConn, bool) { + return wsMgr.Get(connId) +}