zmostp_old.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package zmostp_go
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/ascii85"
  6. "encoding/binary"
  7. "fmt"
  8. "hash/crc32"
  9. "io"
  10. "sync"
  11. )
  12. type Transporter struct {
  13. reader io.ByteReader
  14. writer io.Writer
  15. outch chan *Message
  16. inch chan *Message
  17. humanFriendlySending bool
  18. optLock sync.RWMutex
  19. dfa *DecoderDFA
  20. decodeFailureLogHdl DecodeFailureLogHandler
  21. enableCRC bool
  22. }
  23. func NewTransporter(cfg *TransporterConfig) *Transporter {
  24. t := &Transporter{
  25. reader: cfg.Reader,
  26. writer: cfg.Writer,
  27. outch: make(chan *Message),
  28. inch: make(chan *Message),
  29. humanFriendlySending: cfg.HumanFriendlySending,
  30. optLock: sync.RWMutex{},
  31. enableCRC: cfg.EnableCRC,
  32. }
  33. if cfg.DecodeFailureLog == nil {
  34. t.decodeFailureLogHdl = &EmptyDecodeFailureLogHandler{}
  35. t.dfa = NewDecoderDFA(&EmptyDecodeFailureLogHandler{})
  36. } else {
  37. t.decodeFailureLogHdl = cfg.DecodeFailureLog
  38. t.dfa = NewDecoderDFA(cfg.DecodeFailureLog)
  39. }
  40. return t
  41. }
  42. func (t *Transporter) GetMessageInChannel() chan *Message {
  43. return t.inch
  44. }
  45. func (t *Transporter) GetMessageOutChannel() chan *Message {
  46. return t.outch
  47. }
  48. func (t *Transporter) Run(ctx context.Context) error {
  49. t.dfa.Reset()
  50. var errSend, errRecv, errPipeCopy error
  51. wg := &sync.WaitGroup{}
  52. wg.Add(2)
  53. pctx, cncl := context.WithCancel(ctx)
  54. go func() {
  55. errSend = t.doSend(pctx)
  56. cncl()
  57. wg.Done()
  58. }()
  59. go func() {
  60. errRecv = t.doRecv(pctx)
  61. cncl()
  62. wg.Done()
  63. }()
  64. wg.Wait()
  65. if errSend != nil {
  66. return errSend
  67. }
  68. if errRecv != nil {
  69. return errRecv
  70. }
  71. if errPipeCopy != nil {
  72. if errPipeCopy == io.EOF {
  73. return nil
  74. }
  75. return errPipeCopy
  76. }
  77. return nil
  78. }
  79. func (t *Transporter) doRecv(ctx context.Context) error {
  80. recvErrCh := make(chan error)
  81. recvByteCh := make(chan byte)
  82. dfaOutCh := t.dfa.GetOutChannel()
  83. go func() {
  84. for {
  85. b, err := t.reader.ReadByte()
  86. if err != nil {
  87. if err == io.EOF {
  88. recvErrCh <- nil
  89. return
  90. } else {
  91. recvErrCh <- err
  92. return
  93. }
  94. }
  95. recvByteCh <- b
  96. }
  97. }()
  98. for {
  99. select {
  100. case <-ctx.Done():
  101. close(recvErrCh)
  102. close(recvByteCh)
  103. close(t.outch)
  104. t.dfa.Shutdown()
  105. return nil
  106. case e, ok := <-recvErrCh:
  107. if !ok {
  108. return nil
  109. }
  110. return fmt.Errorf("failed recv from stream: %w", e)
  111. case b, ok := <-recvByteCh:
  112. if !ok {
  113. return nil
  114. }
  115. t.dfa.PushByte(b)
  116. case dom, ok := <-dfaOutCh:
  117. if !ok {
  118. return nil
  119. }
  120. t.decodeMessageFromDFAOut(dom)
  121. }
  122. }
  123. }
  124. func (t *Transporter) doSend(ctx context.Context) error {
  125. for {
  126. select {
  127. case <-ctx.Done():
  128. return nil
  129. case out, ok := <-t.inch:
  130. if !ok {
  131. return nil
  132. }
  133. err := t.encodeSendMessage(out, t.writer)
  134. if err != nil {
  135. return err
  136. }
  137. }
  138. }
  139. }
  140. func (t *Transporter) SetHumanFriendOutputOption(b bool) {
  141. defer t.optLock.Unlock()
  142. t.optLock.Lock()
  143. t.humanFriendlySending = b
  144. }
  145. func (t *Transporter) encodeSendMessage(msg *Message, w io.Writer) error {
  146. defer t.optLock.RUnlock()
  147. t.optLock.RLock()
  148. var buf *bytes.Buffer
  149. if t.humanFriendlySending {
  150. buf = bytes.NewBufferString("{\n\t")
  151. } else {
  152. buf = bytes.NewBuffer([]byte{'{'})
  153. }
  154. asc85end := ascii85.NewEncoder(buf)
  155. chnBytes := binary.LittleEndian.AppendUint32([]byte{}, msg.Channel)
  156. _, err := asc85end.Write(chnBytes)
  157. if err != nil {
  158. return fmt.Errorf("failed to encode chnum of message: ascii85 encoder write error: %w", err)
  159. }
  160. err = asc85end.Close()
  161. if err != nil {
  162. return fmt.Errorf("failed to encode chnum of message: ascii85 encoder close error: %w", err)
  163. }
  164. if t.humanFriendlySending {
  165. buf.WriteString("\n\t|\n\t")
  166. } else {
  167. buf.WriteByte('|')
  168. }
  169. asc85end = ascii85.NewEncoder(buf)
  170. _, err = asc85end.Write(msg.Payload)
  171. if err != nil {
  172. return fmt.Errorf("failed to encode payload of message: ascii85 encoder write error: %w", err)
  173. }
  174. err = asc85end.Close()
  175. if err != nil {
  176. return fmt.Errorf("failed to encode payload of message: ascii85 encoder close error: %w", err)
  177. }
  178. if t.enableCRC {
  179. if t.humanFriendlySending {
  180. buf.WriteString("\n\t|\n\t")
  181. } else {
  182. buf.WriteByte('|')
  183. }
  184. crcBytes := make([]byte, 0, len(msg.Payload)+4)
  185. crcBytes = append(crcBytes, chnBytes...)
  186. crcBytes = append(crcBytes, msg.Payload...)
  187. crcChecksum := crc32.ChecksumIEEE(crcBytes)
  188. crcChsBytes := binary.LittleEndian.AppendUint32([]byte{}, crcChecksum)
  189. asc85end = ascii85.NewEncoder(buf)
  190. _, err = asc85end.Write(crcChsBytes)
  191. if err != nil {
  192. return fmt.Errorf("failed to encode checksum of message: ascii85 encoder write error: %w", err)
  193. }
  194. err = asc85end.Close()
  195. if err != nil {
  196. return fmt.Errorf("failed to encode checksum of message: ascii85 encoder close error: %w", err)
  197. }
  198. }
  199. if t.humanFriendlySending {
  200. buf.WriteString("\n}\n")
  201. } else {
  202. buf.WriteByte('}')
  203. }
  204. _, err = w.Write(buf.Bytes())
  205. if err != nil {
  206. return fmt.Errorf("failed to write message to stream: %w", err)
  207. }
  208. return nil
  209. }
  210. func (t *Transporter) decodeMessageFromDFAOut(msg *DFAOutMsg) {
  211. decChannel := ascii85.NewDecoder(bytes.NewReader(msg.RawChannel))
  212. decPayload := ascii85.NewDecoder(bytes.NewReader(msg.RawPayload))
  213. chbyte, err := io.ReadAll(decChannel)
  214. if err != nil {
  215. t.decodeFailureLogHdl.DecodeChannelAscii85Failed(msg.RawChannel, err)
  216. return
  217. }
  218. chnum := binary.LittleEndian.Uint32(chbyte)
  219. payload, err := io.ReadAll(decPayload)
  220. if err != nil {
  221. t.decodeFailureLogHdl.DecodePayloadAscii85Failed(chnum, msg.RawPayload, err)
  222. return
  223. }
  224. if t.enableCRC {
  225. if len(msg.Checksum) <= 0 {
  226. t.decodeFailureLogHdl.NeedCRCChecksum()
  227. return
  228. }
  229. decCRC := ascii85.NewDecoder(bytes.NewReader(msg.Checksum))
  230. crcBytes, err := io.ReadAll(decCRC)
  231. if err != nil {
  232. t.decodeFailureLogHdl.DecodeChecksumAscii85Failed(chnum, msg.RawPayload, err)
  233. return
  234. }
  235. crcNum := binary.LittleEndian.Uint32(crcBytes)
  236. crcABytes := make([]byte, 0, len(payload)+4)
  237. crcABytes = append(crcABytes, chbyte...)
  238. crcABytes = append(crcABytes, payload...)
  239. crcAChecksum := crc32.ChecksumIEEE(crcABytes)
  240. if crcNum != crcAChecksum {
  241. t.decodeFailureLogHdl.CRCChecksumMismached(crcAChecksum, crcNum)
  242. return
  243. }
  244. }
  245. omsg := &Message{
  246. Channel: chnum,
  247. Payload: payload,
  248. }
  249. t.outch <- omsg
  250. }