|
@@ -0,0 +1,261 @@
|
|
|
+package zmostp_go
|
|
|
+
|
|
|
+import (
|
|
|
+ "bytes"
|
|
|
+ "context"
|
|
|
+ "encoding/ascii85"
|
|
|
+ "encoding/binary"
|
|
|
+ "fmt"
|
|
|
+ "hash/crc32"
|
|
|
+ "io"
|
|
|
+ "sync"
|
|
|
+)
|
|
|
+
|
|
|
+type Transporter struct {
|
|
|
+ reader io.ByteReader
|
|
|
+ writer io.Writer
|
|
|
+ outch chan *Message
|
|
|
+ inch chan *Message
|
|
|
+ humanFriendlySending bool
|
|
|
+ optLock sync.RWMutex
|
|
|
+ dfa *DecoderDFA
|
|
|
+ decodeFailureLogHdl DecodeFailureLogHandler
|
|
|
+ enableCRC bool
|
|
|
+}
|
|
|
+
|
|
|
+func NewTransporter(cfg *TransporterConfig) *Transporter {
|
|
|
+ t := &Transporter{
|
|
|
+ reader: cfg.Reader,
|
|
|
+ writer: cfg.Writer,
|
|
|
+ outch: make(chan *Message),
|
|
|
+ inch: make(chan *Message),
|
|
|
+ humanFriendlySending: cfg.HumanFriendlySending,
|
|
|
+ optLock: sync.RWMutex{},
|
|
|
+ enableCRC: cfg.EnableCRC,
|
|
|
+ }
|
|
|
+ if cfg.DecodeFailureLog == nil {
|
|
|
+ t.decodeFailureLogHdl = &EmptyDecodeFailureLogHandler{}
|
|
|
+ t.dfa = NewDecoderDFA(&EmptyDecodeFailureLogHandler{})
|
|
|
+ } else {
|
|
|
+ t.decodeFailureLogHdl = cfg.DecodeFailureLog
|
|
|
+ t.dfa = NewDecoderDFA(cfg.DecodeFailureLog)
|
|
|
+ }
|
|
|
+ return t
|
|
|
+}
|
|
|
+
|
|
|
+func (t *Transporter) GetMessageInChannel() chan *Message {
|
|
|
+ return t.inch
|
|
|
+}
|
|
|
+
|
|
|
+func (t *Transporter) GetMessageOutChannel() chan *Message {
|
|
|
+ return t.outch
|
|
|
+}
|
|
|
+
|
|
|
+func (t *Transporter) Run(ctx context.Context) error {
|
|
|
+ t.dfa.Reset()
|
|
|
+ var errSend, errRecv, errPipeCopy error
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ wg.Add(2)
|
|
|
+ pctx, cncl := context.WithCancel(ctx)
|
|
|
+ go func() {
|
|
|
+ errSend = t.doSend(pctx)
|
|
|
+ cncl()
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ go func() {
|
|
|
+ errRecv = t.doRecv(pctx)
|
|
|
+ cncl()
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ wg.Wait()
|
|
|
+ if errSend != nil {
|
|
|
+ return errSend
|
|
|
+ }
|
|
|
+ if errRecv != nil {
|
|
|
+ return errRecv
|
|
|
+ }
|
|
|
+ if errPipeCopy != nil {
|
|
|
+ if errPipeCopy == io.EOF {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return errPipeCopy
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (t *Transporter) doRecv(ctx context.Context) error {
|
|
|
+ recvErrCh := make(chan error)
|
|
|
+ recvByteCh := make(chan byte)
|
|
|
+ dfaOutCh := t.dfa.GetOutChannel()
|
|
|
+ go func() {
|
|
|
+ for {
|
|
|
+ b, err := t.reader.ReadByte()
|
|
|
+ if err != nil {
|
|
|
+ if err == io.EOF {
|
|
|
+ recvErrCh <- nil
|
|
|
+ return
|
|
|
+ } else {
|
|
|
+ recvErrCh <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ recvByteCh <- b
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ close(recvErrCh)
|
|
|
+ close(recvByteCh)
|
|
|
+ close(t.outch)
|
|
|
+ t.dfa.Shutdown()
|
|
|
+ return nil
|
|
|
+ case e, ok := <-recvErrCh:
|
|
|
+ if !ok {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return fmt.Errorf("failed recv from stream: %w", e)
|
|
|
+ case b, ok := <-recvByteCh:
|
|
|
+ if !ok {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ t.dfa.PushByte(b)
|
|
|
+ case dom, ok := <-dfaOutCh:
|
|
|
+ if !ok {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ t.decodeMessageFromDFAOut(dom)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (t *Transporter) doSend(ctx context.Context) error {
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ return nil
|
|
|
+ case out, ok := <-t.inch:
|
|
|
+ if !ok {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ err := t.encodeSendMessage(out, t.writer)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (t *Transporter) SetHumanFriendOutputOption(b bool) {
|
|
|
+ defer t.optLock.Unlock()
|
|
|
+ t.optLock.Lock()
|
|
|
+ t.humanFriendlySending = b
|
|
|
+}
|
|
|
+
|
|
|
+func (t *Transporter) encodeSendMessage(msg *Message, w io.Writer) error {
|
|
|
+ defer t.optLock.RUnlock()
|
|
|
+ t.optLock.RLock()
|
|
|
+ var buf *bytes.Buffer
|
|
|
+ if t.humanFriendlySending {
|
|
|
+ buf = bytes.NewBufferString("{\n\t")
|
|
|
+ } else {
|
|
|
+ buf = bytes.NewBuffer([]byte{'{'})
|
|
|
+ }
|
|
|
+ asc85end := ascii85.NewEncoder(buf)
|
|
|
+ chnBytes := binary.LittleEndian.AppendUint32([]byte{}, msg.Channel)
|
|
|
+ _, err := asc85end.Write(chnBytes)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("failed to encode chnum of message: ascii85 encoder write error: %w", err)
|
|
|
+ }
|
|
|
+ err = asc85end.Close()
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("failed to encode chnum of message: ascii85 encoder close error: %w", err)
|
|
|
+ }
|
|
|
+ if t.humanFriendlySending {
|
|
|
+ buf.WriteString("\n\t|\n\t")
|
|
|
+ } else {
|
|
|
+ buf.WriteByte('|')
|
|
|
+ }
|
|
|
+ asc85end = ascii85.NewEncoder(buf)
|
|
|
+ _, err = asc85end.Write(msg.Payload)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("failed to encode payload of message: ascii85 encoder write error: %w", err)
|
|
|
+ }
|
|
|
+ err = asc85end.Close()
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("failed to encode payload of message: ascii85 encoder close error: %w", err)
|
|
|
+ }
|
|
|
+ if t.enableCRC {
|
|
|
+ if t.humanFriendlySending {
|
|
|
+ buf.WriteString("\n\t|\n\t")
|
|
|
+ } else {
|
|
|
+ buf.WriteByte('|')
|
|
|
+ }
|
|
|
+ crcBytes := make([]byte, 0, len(msg.Payload)+4)
|
|
|
+ crcBytes = append(crcBytes, chnBytes...)
|
|
|
+ crcBytes = append(crcBytes, msg.Payload...)
|
|
|
+ crcChecksum := crc32.ChecksumIEEE(crcBytes)
|
|
|
+ crcChsBytes := binary.LittleEndian.AppendUint32([]byte{}, crcChecksum)
|
|
|
+ asc85end = ascii85.NewEncoder(buf)
|
|
|
+ _, err = asc85end.Write(crcChsBytes)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("failed to encode checksum of message: ascii85 encoder write error: %w", err)
|
|
|
+ }
|
|
|
+ err = asc85end.Close()
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("failed to encode checksum of message: ascii85 encoder close error: %w", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if t.humanFriendlySending {
|
|
|
+ buf.WriteString("\n}\n")
|
|
|
+ } else {
|
|
|
+ buf.WriteByte('}')
|
|
|
+ }
|
|
|
+ _, err = w.Write(buf.Bytes())
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("failed to write message to stream: %w", err)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (t *Transporter) decodeMessageFromDFAOut(msg *DFAOutMsg) {
|
|
|
+ decChannel := ascii85.NewDecoder(bytes.NewReader(msg.RawChannel))
|
|
|
+ decPayload := ascii85.NewDecoder(bytes.NewReader(msg.RawPayload))
|
|
|
+ chbyte, err := io.ReadAll(decChannel)
|
|
|
+ if err != nil {
|
|
|
+ t.decodeFailureLogHdl.DecodeChannelAscii85Failed(msg.RawChannel, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ chnum := binary.LittleEndian.Uint32(chbyte)
|
|
|
+ payload, err := io.ReadAll(decPayload)
|
|
|
+ if err != nil {
|
|
|
+ t.decodeFailureLogHdl.DecodePayloadAscii85Failed(chnum, msg.RawPayload, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if t.enableCRC {
|
|
|
+ if len(msg.Checksum) <= 0 {
|
|
|
+ t.decodeFailureLogHdl.NeedCRCChecksum()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ decCRC := ascii85.NewDecoder(bytes.NewReader(msg.Checksum))
|
|
|
+ crcBytes, err := io.ReadAll(decCRC)
|
|
|
+ if err != nil {
|
|
|
+ t.decodeFailureLogHdl.DecodeChecksumAscii85Failed(chnum, msg.RawPayload, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ crcNum := binary.LittleEndian.Uint32(crcBytes)
|
|
|
+ crcABytes := make([]byte, 0, len(payload)+4)
|
|
|
+ crcABytes = append(crcABytes, chbyte...)
|
|
|
+ crcABytes = append(crcABytes, payload...)
|
|
|
+ crcAChecksum := crc32.ChecksumIEEE(crcABytes)
|
|
|
+ if crcNum != crcAChecksum {
|
|
|
+ t.decodeFailureLogHdl.CRCChecksumMismached(crcAChecksum, crcNum)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ omsg := &Message{
|
|
|
+ Channel: chnum,
|
|
|
+ Payload: payload,
|
|
|
+ }
|
|
|
+ t.outch <- omsg
|
|
|
+}
|