receiver.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package zmostp_go
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/ascii85"
  6. "encoding/binary"
  7. "fmt"
  8. "hash/crc32"
  9. "io"
  10. )
  11. type Receiver struct {
  12. reader io.ByteReader
  13. outch chan *Message
  14. dfa *DecoderDFA
  15. decodeFailureLogHdl DecodeFailureLogHandler
  16. enableCRC bool
  17. }
  18. func NewReceiver(cfg *ReceiverConfig) *Receiver {
  19. t := &Receiver{
  20. reader: cfg.ByteReader,
  21. outch: make(chan *Message),
  22. enableCRC: cfg.EnableCRC,
  23. }
  24. if cfg.DecodeFailureLog == nil {
  25. t.decodeFailureLogHdl = &EmptyDecodeFailureLogHandler{}
  26. t.dfa = NewDecoderDFA(&EmptyDecodeFailureLogHandler{})
  27. } else {
  28. t.decodeFailureLogHdl = cfg.DecodeFailureLog
  29. t.dfa = NewDecoderDFA(cfg.DecodeFailureLog)
  30. }
  31. return t
  32. }
  33. func (t *Receiver) GetMessageOutChannel() chan *Message {
  34. return t.outch
  35. }
  36. func (t *Receiver) Run(ctx context.Context) error {
  37. recvErrCh := make(chan error)
  38. recvByteCh := make(chan byte)
  39. dfaOutCh := t.dfa.GetOutChannel()
  40. go func() {
  41. for {
  42. b, err := t.reader.ReadByte()
  43. if err != nil {
  44. if err == io.EOF {
  45. recvErrCh <- nil
  46. return
  47. } else {
  48. recvErrCh <- err
  49. return
  50. }
  51. }
  52. recvByteCh <- b
  53. }
  54. }()
  55. for {
  56. select {
  57. case <-ctx.Done():
  58. close(recvErrCh)
  59. close(recvByteCh)
  60. close(t.outch)
  61. t.dfa.Shutdown()
  62. return nil
  63. case e, ok := <-recvErrCh:
  64. if !ok {
  65. return nil
  66. }
  67. return fmt.Errorf("failed recv from stream: %w", e)
  68. case b, ok := <-recvByteCh:
  69. if !ok {
  70. return nil
  71. }
  72. t.dfa.PushByte(b)
  73. case dom, ok := <-dfaOutCh:
  74. if !ok {
  75. return nil
  76. }
  77. t.decodeMessageFromDFAOut(dom)
  78. }
  79. }
  80. }
  81. func (t *Receiver) decodeMessageFromDFAOut(msg *DFAOutMsg) {
  82. decChannel := ascii85.NewDecoder(bytes.NewReader(msg.RawChannel))
  83. decPayload := ascii85.NewDecoder(bytes.NewReader(msg.RawPayload))
  84. chbyte, err := io.ReadAll(decChannel)
  85. if err != nil {
  86. t.decodeFailureLogHdl.DecodeChannelAscii85Failed(msg.RawChannel, err)
  87. return
  88. }
  89. if len(chbyte) != 4 {
  90. t.decodeFailureLogHdl.InvalidChannelByteLength(chbyte)
  91. return
  92. }
  93. chnum := binary.LittleEndian.Uint32(chbyte)
  94. payload, err := io.ReadAll(decPayload)
  95. if err != nil {
  96. t.decodeFailureLogHdl.DecodePayloadAscii85Failed(chnum, msg.RawPayload, err)
  97. return
  98. }
  99. if t.enableCRC {
  100. if len(msg.Checksum) <= 0 {
  101. t.decodeFailureLogHdl.NeedCRCChecksum()
  102. return
  103. }
  104. decCRC := ascii85.NewDecoder(bytes.NewReader(msg.Checksum))
  105. crcBytes, err := io.ReadAll(decCRC)
  106. if err != nil {
  107. t.decodeFailureLogHdl.DecodeChecksumAscii85Failed(chnum, msg.RawPayload, err)
  108. return
  109. }
  110. if len(crcBytes) != 4 {
  111. t.decodeFailureLogHdl.InvalidChecksumByteLength(chnum, crcBytes)
  112. return
  113. }
  114. crcNum := binary.LittleEndian.Uint32(crcBytes)
  115. crcABytes := make([]byte, 0, len(payload)+4)
  116. crcABytes = append(crcABytes, chbyte...)
  117. crcABytes = append(crcABytes, payload...)
  118. crcAChecksum := crc32.ChecksumIEEE(crcABytes)
  119. if crcNum != crcAChecksum {
  120. t.decodeFailureLogHdl.CRCChecksumMismached(crcAChecksum, crcNum)
  121. return
  122. }
  123. }
  124. omsg := &Message{
  125. Channel: chnum,
  126. Payload: payload,
  127. }
  128. t.outch <- omsg
  129. }