package zsshrpc_server import ( "fmt" "golang.org/x/crypto/ssh" "net" ) type ZSshRpcSessionContext struct { NetConn net.Conn ServerConn *ssh.ServerConn NewChannelChan <-chan ssh.NewChannel ReqChan <-chan *ssh.Request OperationHandler ZSshRpcOperationHandler Logger SvrLogFunc } type ZSshRpcChannelContext struct { Channel ssh.Channel ReqChan <-chan *ssh.Request SessionCtx *ZSshRpcSessionContext Logger SvrLogFunc RxState ZSshRpcChState RxMethod ZSshRpcMethod RxBuf ThreadSafeBuffer UriStr string JsonStr string ResponseData ZSshRpcOperationResponse } func HandleNewSession(conn net.Conn, sshcfg *ssh.ServerConfig, logger SvrLogFunc, handler ZSshRpcOperationHandler, IOBlockSize int) { sconn, schan, reqchan, err := ssh.NewServerConn(conn, sshcfg) if err != nil { logger(SvrLogLevel_WARNING, fmt.Sprintf("Failed Handling Session From Client '%v'", conn.RemoteAddr()), err) return } rpcSessCtx := &ZSshRpcSessionContext{ NetConn: conn, ServerConn: sconn, NewChannelChan: schan, ReqChan: reqchan, OperationHandler: handler, Logger: logger, } go ssh.DiscardRequests(reqchan) go handleChannels(rpcSessCtx, IOBlockSize) } func try(Logger SvrLogFunc) { if err := recover(); err != nil { var einf error switch err.(type) { case error: einf = err.(error) break default: einf = nil break } Logger(SvrLogLevel_WARNING, "Internal Error Recovered", einf) } } func handleChannels(ctx *ZSshRpcSessionContext, IOBlockSize int) { defer try(ctx.Logger) for newchan := range ctx.NewChannelChan { go handleChannel(ctx, newchan, IOBlockSize) } } func handleChannel(sessctx *ZSshRpcSessionContext, nch ssh.NewChannel, IOBlockSize int) { defer try(sessctx.Logger) if nch.ChannelType() == "zsshrpc" { ch, req, err := nch.Accept() if err != nil { sessctx.Logger(SvrLogLevel_INFO, fmt.Sprintf("Failed Handler Channel From '%v'", sessctx.NetConn.RemoteAddr()), err) sessctx.ServerConn.Close() } else { chctx := &ZSshRpcChannelContext{ SessionCtx: sessctx, Channel: ch, ReqChan: req, Logger: sessctx.Logger, RxState: ChannelState_IDLE, } go handleZSshRpcChannel(chctx, IOBlockSize) } } else { sessctx.Logger(SvrLogLevel_INFO, fmt.Sprintf("Unsupported Channel Type '%s' From Client '%v'", nch.ChannelType(), sessctx.NetConn.RemoteAddr()), nil) nch.Reject(ssh.UnknownChannelType, "Unsupported Channel Type.") sessctx.ServerConn.Close() } } func readChannelData(b []byte, recvBufLen int, chctx *ZSshRpcChannelContext) { for { rl, err := chctx.Channel.Read(b) if err != nil || rl == 0 { break } if rl < recvBufLen { if rl != 0 { if b[rl-1] == 0 { chctx.RxBuf.Write(b[:rl-1]) break } else { chctx.RxBuf.Write(b[:rl]) } } } else { if b[recvBufLen-1] == 0 { chctx.RxBuf.Write(b[:recvBufLen-1]) break } else { chctx.RxBuf.Write(b) } } } } func handleZSshRpcChannel(chctx *ZSshRpcChannelContext, IOBlockSize int) { defer try(chctx.Logger) ioblockbuf := make([]byte, IOBlockSize) waitchan := make(chan int) go func() { defer try(chctx.Logger) for { readChannelData(ioblockbuf, IOBlockSize, chctx) //chctx.Logger(SvrLogLevel_DEBUG, fmt.Sprintf(" [len=%d] %s",chctx.RxBuf.Len(), hex.EncodeToString([]byte(chctx.RxBuf.String()))), nil) waitchan <- 1 } //io.Copy(&chctx.RxBuf, chctx.Channel) }() for { req := <-chctx.ReqChan switch chctx.RxState { case ChannelState_IDLE: { if req.Type == "ZSSHRPC-1.0" { chctx.RxState = ChannelState_WAIT_URI chctx.RxBuf.Reset() } } break case ChannelState_WAIT_URI: { //chctx.Logger(SvrLogLevel_DEBUG, fmt.Sprintf("[Before URL Clear]Length Of RxBuf: %d", chctx.RxBuf.Len()), nil) switch req.Type { case "CALL": { chctx.RxMethod = RpcMethod_CALL chctx.RxState = ChannelState_WAIT_JSON <-waitchan chctx.UriStr = chctx.RxBuf.String() chctx.RxBuf.Reset() } break case "ADD": { chctx.RxMethod = RpcMethod_ADD chctx.RxState = ChannelState_WAIT_JSON <-waitchan chctx.UriStr = chctx.RxBuf.String() chctx.RxBuf.Reset() } break case "DEL": { chctx.RxMethod = RpcMethod_DEL chctx.RxState = ChannelState_WAIT_JSON <-waitchan chctx.UriStr = chctx.RxBuf.String() chctx.RxBuf.Reset() } break case "GET": { chctx.RxMethod = RpcMethod_GET chctx.RxState = ChannelState_WAIT_JSON <-waitchan chctx.UriStr = chctx.RxBuf.String() chctx.RxBuf.Reset() } break case "SET": { chctx.RxMethod = RpcMethod_SET chctx.RxState = ChannelState_WAIT_JSON <-waitchan chctx.UriStr = chctx.RxBuf.String() chctx.RxBuf.Reset() } break default: { chctx.RxState = ChannelState_IDLE chctx.RxBuf.Reset() } break } chctx.Logger(SvrLogLevel_DEBUG, fmt.Sprint("Recv URI: ", chctx.UriStr), nil) //chctx.Logger(SvrLogLevel_DEBUG, fmt.Sprintf("[After URI Clear]Length Of RxBuf: %d", chctx.RxBuf.Len()), nil) } break case ChannelState_WAIT_JSON: { if req.Type == "ENDREQ" { //chctx.Logger(SvrLogLevel_DEBUG, fmt.Sprintf("[Before JSON Clear]Length Of RxBuf: %d", chctx.RxBuf.Len()), nil) <-waitchan chctx.JsonStr = chctx.RxBuf.String() //chctx.Logger(SvrLogLevel_DEBUG, fmt.Sprint("Recv JSON: ", chctx.JsonStr), nil) chctx.RxBuf.Reset() //chctx.Logger(SvrLogLevel_DEBUG, fmt.Sprintf("[After JSON Clear]Length Of RxBuf: %d", chctx.RxBuf.Len()), nil) chctx.RxState = ChannelState_EXEC_HANDLER req := ZSshRpcOperationRequest{ ChannelContext: chctx, Method: chctx.RxMethod, URI: chctx.UriStr, JSON: chctx.JsonStr, } chctx.ResponseData = chctx.SessionCtx.OperationHandler.HandleOperation(req) rcstring := fmt.Sprintf( "%d - %s", chctx.ResponseData.StatusCode, GetResponseStatusCodeString(chctx.ResponseData.StatusCode), ) chctx.Channel.Write([]byte(rcstring)) chctx.Channel.Write([]byte{0}) chctx.RxState = ChannelState_WAIT_JSON_READ } else { chctx.RxState = ChannelState_IDLE chctx.RxBuf.Reset() } } break case ChannelState_EXEC_HANDLER: { chctx.RxState = ChannelState_IDLE chctx.RxBuf.Reset() } break case ChannelState_WAIT_JSON_READ: if req.Type == "GETJSON" { chctx.Channel.Write([]byte(chctx.ResponseData.ResponseJSON)) chctx.Channel.Write([]byte{0}) } chctx.RxState = ChannelState_IDLE break } if req.WantReply { req.Reply(true, nil) } } }