123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- package wslogdist
- import (
- "bytes"
- "fmt"
- "git.swzry.com/zry/gorillaws2netconn"
- "github.com/GUAIK-ORG/go-snowflake/snowflake"
- "github.com/gin-gonic/gin"
- "github.com/gorilla/websocket"
- "net/http"
- "sync"
- "time"
- )
- type WsSessions struct {
- Conn *gorillaws2netconn.NetConn4Gorilla
- BeginTime time.Time
- }
- func NewWsSessions(conn *gorillaws2netconn.NetConn4Gorilla) *WsSessions {
- s := &WsSessions{
- Conn: conn,
- BeginTime: time.Now(),
- }
- return s
- }
- type WsLogDistributor struct {
- wsList map[string]*WsSessions
- lock sync.RWMutex
- upgrader *websocket.Upgrader
- startLogBuffer *bytes.Buffer
- startLogBufferSize int
- snowFlaker *snowflake.Snowflake
- }
- func NewWsLogDistributor(startLogBufferSize int) *WsLogDistributor {
- d := &WsLogDistributor{
- wsList: make(map[string]*WsSessions),
- startLogBufferSize: startLogBufferSize,
- }
- d.upgrader = &websocket.Upgrader{
- CheckOrigin: func(r *http.Request) bool {
- return true
- },
- }
- bbuf := make([]byte, 0, startLogBufferSize)
- d.startLogBuffer = bytes.NewBuffer(bbuf)
- flake, err := snowflake.NewSnowflake(int64(0), int64(0))
- if err != nil {
- panic(err)
- }
- d.snowFlaker = flake
- return d
- }
- func (w *WsLogDistributor) Write(p []byte) (n int, err error) {
- if w.startLogBuffer.Len() < w.startLogBufferSize {
- w.startLogBuffer.Write(p)
- }
- defer w.lock.Unlock()
- w.lock.Lock()
- for k, v := range w.wsList {
- _, err := v.Conn.Write(p)
- if err != nil {
- _ = v.Conn.WS.Close()
- delete(w.wsList, k)
- }
- }
- return len(p), nil
- }
- func (w *WsLogDistributor) HandleNewConnections(ctx *gin.Context) {
- ustr := fmt.Sprintf("%16X", w.snowFlaker.NextVal())
- ws, err := w.upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
- if err != nil {
- ctx.JSON(500, gin.H{
- "suc": false,
- "err_msg": "failed upgrading to websocket",
- })
- return
- }
- nconn := &gorillaws2netconn.NetConn4Gorilla{WS: ws, WriteMessageType: websocket.TextMessage}
- w.lock.Lock()
- sess := NewWsSessions(nconn)
- w.wsList[ustr] = sess
- w.lock.Unlock()
- buf := make([]byte, 1024)
- kc := make(chan int)
- go func() {
- for {
- _, err := sess.Conn.Read(buf)
- if err != nil {
- kc <- 0
- return
- }
- }
- }()
- <-kc
- _ = sess.Conn.Close()
- w.lock.Lock()
- delete(w.wsList, ustr)
- w.lock.Unlock()
- }
- func (w *WsLogDistributor) ClearConnections() {
- defer w.lock.Unlock()
- w.lock.Lock()
- for k, v := range w.wsList {
- _ = v.Conn.Close()
- delete(w.wsList, k)
- }
- }
- func (w *WsLogDistributor) GetClientsInfo() map[string]map[string]interface{} {
- defer w.lock.RUnlock()
- w.lock.RLock()
- now := time.Now()
- m := map[string]map[string]interface{}{}
- for k, v := range w.wsList {
- m[k] = map[string]interface{}{
- "begin_time": v.BeginTime.Unix(),
- "begin_time_in_rfc3399": v.BeginTime.Format(time.RFC3339),
- "begin_to_now_nanoseconds": now.Sub(v.BeginTime),
- "begin_to_now_str": now.Sub(v.BeginTime).String(),
- "remote_addr": v.Conn.RemoteAddr().String(),
- "local_addr": v.Conn.LocalAddr().String(),
- }
- }
- return m
- }
- func (w *WsLogDistributor) GetStartLog() string {
- return w.startLogBuffer.String()
- }
- func (w *WsLogDistributor) ClearStartLog() {
- w.startLogBuffer.Reset()
- }
|