123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- package zmostp_go
- import (
- "bytes"
- "context"
- "encoding/ascii85"
- "encoding/binary"
- "fmt"
- "hash/crc32"
- "io"
- )
- type Receiver struct {
- reader io.ByteReader
- outch chan *Message
- dfa *DecoderDFA
- decodeFailureLogHdl DecodeFailureLogHandler
- enableCRC bool
- }
- func NewReceiver(cfg *ReceiverConfig) *Receiver {
- t := &Receiver{
- reader: cfg.ByteReader,
- outch: make(chan *Message),
- enableCRC: cfg.EnableCRC,
- }
- if cfg.DecodeFailureLog == nil {
- t.decodeFailureLogHdl = &EmptyDecodeFailureLogHandler{}
- t.dfa = NewDecoderDFA(&EmptyDecodeFailureLogHandler{})
- } else {
- t.decodeFailureLogHdl = cfg.DecodeFailureLog
- t.dfa = NewDecoderDFA(cfg.DecodeFailureLog)
- }
- return t
- }
- func (t *Receiver) GetMessageOutChannel() chan *Message {
- return t.outch
- }
- func (t *Receiver) Run(ctx context.Context) error {
- recvErrCh := make(chan error)
- recvByteCh := make(chan byte)
- dfaOutCh := t.dfa.GetOutChannel()
- go func() {
- for {
- b, err := t.reader.ReadByte()
- if err != nil {
- if err == io.EOF {
- recvErrCh <- nil
- return
- } else {
- recvErrCh <- err
- return
- }
- }
- recvByteCh <- b
- }
- }()
- for {
- select {
- case <-ctx.Done():
- close(recvErrCh)
- close(recvByteCh)
- close(t.outch)
- t.dfa.Shutdown()
- return nil
- case e, ok := <-recvErrCh:
- if !ok {
- return nil
- }
- return fmt.Errorf("failed recv from stream: %w", e)
- case b, ok := <-recvByteCh:
- if !ok {
- return nil
- }
- t.dfa.PushByte(b)
- case dom, ok := <-dfaOutCh:
- if !ok {
- return nil
- }
- t.decodeMessageFromDFAOut(dom)
- }
- }
- }
- func (t *Receiver) decodeMessageFromDFAOut(msg *DFAOutMsg) {
- decChannel := ascii85.NewDecoder(bytes.NewReader(msg.RawChannel))
- decPayload := ascii85.NewDecoder(bytes.NewReader(msg.RawPayload))
- chbyte, err := io.ReadAll(decChannel)
- if err != nil {
- t.decodeFailureLogHdl.DecodeChannelAscii85Failed(msg.RawChannel, err)
- return
- }
- if len(chbyte) != 4 {
- t.decodeFailureLogHdl.InvalidChannelByteLength(chbyte)
- return
- }
- chnum := binary.LittleEndian.Uint32(chbyte)
- payload, err := io.ReadAll(decPayload)
- if err != nil {
- t.decodeFailureLogHdl.DecodePayloadAscii85Failed(chnum, msg.RawPayload, err)
- return
- }
- if t.enableCRC {
- if len(msg.Checksum) <= 0 {
- t.decodeFailureLogHdl.NeedCRCChecksum()
- return
- }
- decCRC := ascii85.NewDecoder(bytes.NewReader(msg.Checksum))
- crcBytes, err := io.ReadAll(decCRC)
- if err != nil {
- t.decodeFailureLogHdl.DecodeChecksumAscii85Failed(chnum, msg.RawPayload, err)
- return
- }
- if len(crcBytes) != 4 {
- t.decodeFailureLogHdl.InvalidChecksumByteLength(chnum, crcBytes)
- return
- }
- crcNum := binary.LittleEndian.Uint32(crcBytes)
- crcABytes := make([]byte, 0, len(payload)+4)
- crcABytes = append(crcABytes, chbyte...)
- crcABytes = append(crcABytes, payload...)
- crcAChecksum := crc32.ChecksumIEEE(crcABytes)
- if crcNum != crcAChecksum {
- t.decodeFailureLogHdl.CRCChecksumMismached(crcAChecksum, crcNum)
- return
- }
- }
- omsg := &Message{
- Channel: chnum,
- Payload: payload,
- }
- t.outch <- omsg
- }
|