123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- package zsshrpc_server
- import (
- "fmt"
- "golang.org/x/crypto/ssh"
- "net"
- )
- type ZSshRpcSessionContext struct {
- NetConn net.Conn
- ServerConn *ssh.ServerConn
- NewChannelChan <-chan ssh.NewChannel
- ReqChan <-chan *ssh.Request
- OperationHandler ZSshRpcOperationHandler
- Logger SvrLogFunc
- }
- type ZSshRpcChannelContext struct {
- Channel ssh.Channel
- ReqChan <-chan *ssh.Request
- SessionCtx *ZSshRpcSessionContext
- Logger SvrLogFunc
- RxState ZSshRpcChState
- RxMethod ZSshRpcMethod
- RxBuf ThreadSafeBuffer
- UriStr string
- JsonStr string
- ResponseData ZSshRpcOperationResponse
- }
- func HandleNewSession(conn net.Conn, sshcfg *ssh.ServerConfig, logger SvrLogFunc, handler ZSshRpcOperationHandler, IOBlockSize int) {
- sconn, schan, reqchan, err := ssh.NewServerConn(conn, sshcfg)
- if err != nil {
- logger(SvrLogLevel_WARNING, fmt.Sprintf("Failed Handling Session From Client '%v'", conn.RemoteAddr()), err)
- return
- }
- rpcSessCtx := &ZSshRpcSessionContext{
- NetConn: conn,
- ServerConn: sconn,
- NewChannelChan: schan,
- ReqChan: reqchan,
- OperationHandler: handler,
- Logger: logger,
- }
- go ssh.DiscardRequests(reqchan)
- go handleChannels(rpcSessCtx, IOBlockSize)
- }
- func try(Logger SvrLogFunc) {
- if err := recover(); err != nil {
- var einf error
- switch err.(type) {
- case error:
- einf = err.(error)
- break
- default:
- einf = nil
- break
- }
- Logger(SvrLogLevel_WARNING, "Internal Error Recovered", einf)
- }
- }
- func handleChannels(ctx *ZSshRpcSessionContext, IOBlockSize int) {
- defer try(ctx.Logger)
- for newchan := range ctx.NewChannelChan {
- go handleChannel(ctx, newchan, IOBlockSize)
- }
- }
- func handleChannel(sessctx *ZSshRpcSessionContext, nch ssh.NewChannel, IOBlockSize int) {
- defer try(sessctx.Logger)
- if nch.ChannelType() == "zsshrpc" {
- ch, req, err := nch.Accept()
- if err != nil {
- sessctx.Logger(SvrLogLevel_INFO, fmt.Sprintf("Failed Handler Channel From '%v'", sessctx.NetConn.RemoteAddr()), err)
- sessctx.ServerConn.Close()
- } else {
- chctx := &ZSshRpcChannelContext{
- SessionCtx: sessctx,
- Channel: ch,
- ReqChan: req,
- Logger: sessctx.Logger,
- RxState: ChannelState_IDLE,
- }
- go handleZSshRpcChannel(chctx, IOBlockSize)
- }
- } else {
- sessctx.Logger(SvrLogLevel_INFO, fmt.Sprintf("Unsupported Channel Type '%s' From Client '%v'", nch.ChannelType(), sessctx.NetConn.RemoteAddr()), nil)
- nch.Reject(ssh.UnknownChannelType, "Unsupported Channel Type.")
- sessctx.ServerConn.Close()
- }
- }
- func readChannelData(b []byte, recvBufLen int, chctx *ZSshRpcChannelContext) {
- for {
- rl, err := chctx.Channel.Read(b)
- if err != nil || rl == 0 {
- break
- }
- if rl < recvBufLen {
- if rl != 0 {
- if b[rl-1] == 0 {
- chctx.RxBuf.Write(b[:rl-1])
- break
- } else {
- chctx.RxBuf.Write(b[:rl])
- }
- }
- } else {
- if b[recvBufLen-1] == 0 {
- chctx.RxBuf.Write(b[:recvBufLen-1])
- break
- } else {
- chctx.RxBuf.Write(b)
- }
- }
- }
- }
- func handleZSshRpcChannel(chctx *ZSshRpcChannelContext, IOBlockSize int) {
- defer try(chctx.Logger)
- ioblockbuf := make([]byte, IOBlockSize)
- waitchan := make(chan int)
- go func() {
- defer try(chctx.Logger)
- for {
- readChannelData(ioblockbuf, IOBlockSize, chctx)
- //chctx.Logger(SvrLogLevel_DEBUG, fmt.Sprintf("<RECV> [len=%d] %s",chctx.RxBuf.Len(), hex.EncodeToString([]byte(chctx.RxBuf.String()))), nil)
- waitchan <- 1
- }
- //io.Copy(&chctx.RxBuf, chctx.Channel)
- }()
- for {
- req := <-chctx.ReqChan
- switch chctx.RxState {
- case ChannelState_IDLE:
- {
- if req.Type == "ZSSHRPC-1.0" {
- chctx.RxState = ChannelState_WAIT_URI
- chctx.RxBuf.Reset()
- }
- }
- break
- case ChannelState_WAIT_URI:
- {
- //chctx.Logger(SvrLogLevel_DEBUG, fmt.Sprintf("[Before URL Clear]Length Of RxBuf: %d", chctx.RxBuf.Len()), nil)
- switch req.Type {
- case "CALL":
- {
- chctx.RxMethod = RpcMethod_CALL
- chctx.RxState = ChannelState_WAIT_JSON
- <-waitchan
- chctx.UriStr = chctx.RxBuf.String()
- chctx.RxBuf.Reset()
- }
- break
- case "ADD":
- {
- chctx.RxMethod = RpcMethod_ADD
- chctx.RxState = ChannelState_WAIT_JSON
- <-waitchan
- chctx.UriStr = chctx.RxBuf.String()
- chctx.RxBuf.Reset()
- }
- break
- case "DEL":
- {
- chctx.RxMethod = RpcMethod_DEL
- chctx.RxState = ChannelState_WAIT_JSON
- <-waitchan
- chctx.UriStr = chctx.RxBuf.String()
- chctx.RxBuf.Reset()
- }
- break
- case "GET":
- {
- chctx.RxMethod = RpcMethod_GET
- chctx.RxState = ChannelState_WAIT_JSON
- <-waitchan
- chctx.UriStr = chctx.RxBuf.String()
- chctx.RxBuf.Reset()
- }
- break
- case "SET":
- {
- chctx.RxMethod = RpcMethod_SET
- chctx.RxState = ChannelState_WAIT_JSON
- <-waitchan
- chctx.UriStr = chctx.RxBuf.String()
- chctx.RxBuf.Reset()
- }
- break
- default:
- {
- chctx.RxState = ChannelState_IDLE
- chctx.RxBuf.Reset()
- }
- break
- }
- chctx.Logger(SvrLogLevel_DEBUG, fmt.Sprint("Recv URI: ", chctx.UriStr), nil)
- //chctx.Logger(SvrLogLevel_DEBUG, fmt.Sprintf("[After URI Clear]Length Of RxBuf: %d", chctx.RxBuf.Len()), nil)
- }
- break
- case ChannelState_WAIT_JSON:
- {
- if req.Type == "ENDREQ" {
- //chctx.Logger(SvrLogLevel_DEBUG, fmt.Sprintf("[Before JSON Clear]Length Of RxBuf: %d", chctx.RxBuf.Len()), nil)
- <-waitchan
- chctx.JsonStr = chctx.RxBuf.String()
- //chctx.Logger(SvrLogLevel_DEBUG, fmt.Sprint("Recv JSON: ", chctx.JsonStr), nil)
- chctx.RxBuf.Reset()
- //chctx.Logger(SvrLogLevel_DEBUG, fmt.Sprintf("[After JSON Clear]Length Of RxBuf: %d", chctx.RxBuf.Len()), nil)
- chctx.RxState = ChannelState_EXEC_HANDLER
- req := ZSshRpcOperationRequest{
- ChannelContext: chctx,
- Method: chctx.RxMethod,
- URI: chctx.UriStr,
- JSON: chctx.JsonStr,
- }
- chctx.ResponseData = chctx.SessionCtx.OperationHandler.HandleOperation(req)
- rcstring := fmt.Sprintf(
- "%d - %s",
- chctx.ResponseData.StatusCode,
- GetResponseStatusCodeString(chctx.ResponseData.StatusCode),
- )
- chctx.Channel.Write([]byte(rcstring))
- chctx.Channel.Write([]byte{0})
- chctx.RxState = ChannelState_WAIT_JSON_READ
- } else {
- chctx.RxState = ChannelState_IDLE
- chctx.RxBuf.Reset()
- }
- }
- break
- case ChannelState_EXEC_HANDLER:
- {
- chctx.RxState = ChannelState_IDLE
- chctx.RxBuf.Reset()
- }
- break
- case ChannelState_WAIT_JSON_READ:
- if req.Type == "GETJSON" {
- chctx.Channel.Write([]byte(chctx.ResponseData.ResponseJSON))
- chctx.Channel.Write([]byte{0})
- }
- chctx.RxState = ChannelState_IDLE
- break
- }
- if req.WantReply {
- req.Reply(true, nil)
- }
- }
- }
|