|
@@ -1,261 +0,0 @@
|
|
|
-package zmostp_go
|
|
|
-
|
|
|
-import (
|
|
|
- "bytes"
|
|
|
- "context"
|
|
|
- "encoding/ascii85"
|
|
|
- "encoding/binary"
|
|
|
- "fmt"
|
|
|
- "hash/crc32"
|
|
|
- "io"
|
|
|
- "sync"
|
|
|
-)
|
|
|
-
|
|
|
-type Transporter struct {
|
|
|
- reader io.ByteReader
|
|
|
- writer io.Writer
|
|
|
- outch chan *Message
|
|
|
- inch chan *Message
|
|
|
- humanFriendlySending bool
|
|
|
- optLock sync.RWMutex
|
|
|
- dfa *DecoderDFA
|
|
|
- decodeFailureLogHdl DecodeFailureLogHandler
|
|
|
- enableCRC bool
|
|
|
-}
|
|
|
-
|
|
|
-func NewTransporter(cfg *TransporterConfig) *Transporter {
|
|
|
- t := &Transporter{
|
|
|
- reader: cfg.Reader,
|
|
|
- writer: cfg.Writer,
|
|
|
- outch: make(chan *Message),
|
|
|
- inch: make(chan *Message),
|
|
|
- humanFriendlySending: cfg.HumanFriendlySending,
|
|
|
- optLock: sync.RWMutex{},
|
|
|
- enableCRC: cfg.EnableCRC,
|
|
|
- }
|
|
|
- if cfg.DecodeFailureLog == nil {
|
|
|
- t.decodeFailureLogHdl = &EmptyDecodeFailureLogHandler{}
|
|
|
- t.dfa = NewDecoderDFA(&EmptyDecodeFailureLogHandler{})
|
|
|
- } else {
|
|
|
- t.decodeFailureLogHdl = cfg.DecodeFailureLog
|
|
|
- t.dfa = NewDecoderDFA(cfg.DecodeFailureLog)
|
|
|
- }
|
|
|
- return t
|
|
|
-}
|
|
|
-
|
|
|
-func (t *Transporter) GetMessageInChannel() chan *Message {
|
|
|
- return t.inch
|
|
|
-}
|
|
|
-
|
|
|
-func (t *Transporter) GetMessageOutChannel() chan *Message {
|
|
|
- return t.outch
|
|
|
-}
|
|
|
-
|
|
|
-func (t *Transporter) Run(ctx context.Context) error {
|
|
|
- t.dfa.Reset()
|
|
|
- var errSend, errRecv, errPipeCopy error
|
|
|
- wg := &sync.WaitGroup{}
|
|
|
- wg.Add(2)
|
|
|
- pctx, cncl := context.WithCancel(ctx)
|
|
|
- go func() {
|
|
|
- errSend = t.doSend(pctx)
|
|
|
- cncl()
|
|
|
- wg.Done()
|
|
|
- }()
|
|
|
- go func() {
|
|
|
- errRecv = t.doRecv(pctx)
|
|
|
- cncl()
|
|
|
- wg.Done()
|
|
|
- }()
|
|
|
- wg.Wait()
|
|
|
- if errSend != nil {
|
|
|
- return errSend
|
|
|
- }
|
|
|
- if errRecv != nil {
|
|
|
- return errRecv
|
|
|
- }
|
|
|
- if errPipeCopy != nil {
|
|
|
- if errPipeCopy == io.EOF {
|
|
|
- return nil
|
|
|
- }
|
|
|
- return errPipeCopy
|
|
|
- }
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-func (t *Transporter) doRecv(ctx context.Context) error {
|
|
|
- recvErrCh := make(chan error)
|
|
|
- recvByteCh := make(chan byte)
|
|
|
- dfaOutCh := t.dfa.GetOutChannel()
|
|
|
- go func() {
|
|
|
- for {
|
|
|
- b, err := t.reader.ReadByte()
|
|
|
- if err != nil {
|
|
|
- if err == io.EOF {
|
|
|
- recvErrCh <- nil
|
|
|
- return
|
|
|
- } else {
|
|
|
- recvErrCh <- err
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
- recvByteCh <- b
|
|
|
- }
|
|
|
- }()
|
|
|
- for {
|
|
|
- select {
|
|
|
- case <-ctx.Done():
|
|
|
- close(recvErrCh)
|
|
|
- close(recvByteCh)
|
|
|
- close(t.outch)
|
|
|
- t.dfa.Shutdown()
|
|
|
- return nil
|
|
|
- case e, ok := <-recvErrCh:
|
|
|
- if !ok {
|
|
|
- return nil
|
|
|
- }
|
|
|
- return fmt.Errorf("failed recv from stream: %w", e)
|
|
|
- case b, ok := <-recvByteCh:
|
|
|
- if !ok {
|
|
|
- return nil
|
|
|
- }
|
|
|
- t.dfa.PushByte(b)
|
|
|
- case dom, ok := <-dfaOutCh:
|
|
|
- if !ok {
|
|
|
- return nil
|
|
|
- }
|
|
|
- t.decodeMessageFromDFAOut(dom)
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func (t *Transporter) doSend(ctx context.Context) error {
|
|
|
- for {
|
|
|
- select {
|
|
|
- case <-ctx.Done():
|
|
|
- return nil
|
|
|
- case out, ok := <-t.inch:
|
|
|
- if !ok {
|
|
|
- return nil
|
|
|
- }
|
|
|
- err := t.encodeSendMessage(out, t.writer)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func (t *Transporter) SetHumanFriendOutputOption(b bool) {
|
|
|
- defer t.optLock.Unlock()
|
|
|
- t.optLock.Lock()
|
|
|
- t.humanFriendlySending = b
|
|
|
-}
|
|
|
-
|
|
|
-func (t *Transporter) encodeSendMessage(msg *Message, w io.Writer) error {
|
|
|
- defer t.optLock.RUnlock()
|
|
|
- t.optLock.RLock()
|
|
|
- var buf *bytes.Buffer
|
|
|
- if t.humanFriendlySending {
|
|
|
- buf = bytes.NewBufferString("{\n\t")
|
|
|
- } else {
|
|
|
- buf = bytes.NewBuffer([]byte{'{'})
|
|
|
- }
|
|
|
- asc85end := ascii85.NewEncoder(buf)
|
|
|
- chnBytes := binary.LittleEndian.AppendUint32([]byte{}, msg.Channel)
|
|
|
- _, err := asc85end.Write(chnBytes)
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("failed to encode chnum of message: ascii85 encoder write error: %w", err)
|
|
|
- }
|
|
|
- err = asc85end.Close()
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("failed to encode chnum of message: ascii85 encoder close error: %w", err)
|
|
|
- }
|
|
|
- if t.humanFriendlySending {
|
|
|
- buf.WriteString("\n\t|\n\t")
|
|
|
- } else {
|
|
|
- buf.WriteByte('|')
|
|
|
- }
|
|
|
- asc85end = ascii85.NewEncoder(buf)
|
|
|
- _, err = asc85end.Write(msg.Payload)
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("failed to encode payload of message: ascii85 encoder write error: %w", err)
|
|
|
- }
|
|
|
- err = asc85end.Close()
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("failed to encode payload of message: ascii85 encoder close error: %w", err)
|
|
|
- }
|
|
|
- if t.enableCRC {
|
|
|
- if t.humanFriendlySending {
|
|
|
- buf.WriteString("\n\t|\n\t")
|
|
|
- } else {
|
|
|
- buf.WriteByte('|')
|
|
|
- }
|
|
|
- crcBytes := make([]byte, 0, len(msg.Payload)+4)
|
|
|
- crcBytes = append(crcBytes, chnBytes...)
|
|
|
- crcBytes = append(crcBytes, msg.Payload...)
|
|
|
- crcChecksum := crc32.ChecksumIEEE(crcBytes)
|
|
|
- crcChsBytes := binary.LittleEndian.AppendUint32([]byte{}, crcChecksum)
|
|
|
- asc85end = ascii85.NewEncoder(buf)
|
|
|
- _, err = asc85end.Write(crcChsBytes)
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("failed to encode checksum of message: ascii85 encoder write error: %w", err)
|
|
|
- }
|
|
|
- err = asc85end.Close()
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("failed to encode checksum of message: ascii85 encoder close error: %w", err)
|
|
|
- }
|
|
|
- }
|
|
|
- if t.humanFriendlySending {
|
|
|
- buf.WriteString("\n}\n")
|
|
|
- } else {
|
|
|
- buf.WriteByte('}')
|
|
|
- }
|
|
|
- _, err = w.Write(buf.Bytes())
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("failed to write message to stream: %w", err)
|
|
|
- }
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-func (t *Transporter) decodeMessageFromDFAOut(msg *DFAOutMsg) {
|
|
|
- decChannel := ascii85.NewDecoder(bytes.NewReader(msg.RawChannel))
|
|
|
- decPayload := ascii85.NewDecoder(bytes.NewReader(msg.RawPayload))
|
|
|
- chbyte, err := io.ReadAll(decChannel)
|
|
|
- if err != nil {
|
|
|
- t.decodeFailureLogHdl.DecodeChannelAscii85Failed(msg.RawChannel, err)
|
|
|
- return
|
|
|
- }
|
|
|
- chnum := binary.LittleEndian.Uint32(chbyte)
|
|
|
- payload, err := io.ReadAll(decPayload)
|
|
|
- if err != nil {
|
|
|
- t.decodeFailureLogHdl.DecodePayloadAscii85Failed(chnum, msg.RawPayload, err)
|
|
|
- return
|
|
|
- }
|
|
|
- if t.enableCRC {
|
|
|
- if len(msg.Checksum) <= 0 {
|
|
|
- t.decodeFailureLogHdl.NeedCRCChecksum()
|
|
|
- return
|
|
|
- }
|
|
|
- decCRC := ascii85.NewDecoder(bytes.NewReader(msg.Checksum))
|
|
|
- crcBytes, err := io.ReadAll(decCRC)
|
|
|
- if err != nil {
|
|
|
- t.decodeFailureLogHdl.DecodeChecksumAscii85Failed(chnum, msg.RawPayload, err)
|
|
|
- return
|
|
|
- }
|
|
|
- crcNum := binary.LittleEndian.Uint32(crcBytes)
|
|
|
- crcABytes := make([]byte, 0, len(payload)+4)
|
|
|
- crcABytes = append(crcABytes, chbyte...)
|
|
|
- crcABytes = append(crcABytes, payload...)
|
|
|
- crcAChecksum := crc32.ChecksumIEEE(crcABytes)
|
|
|
- if crcNum != crcAChecksum {
|
|
|
- t.decodeFailureLogHdl.CRCChecksumMismached(crcAChecksum, crcNum)
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
- omsg := &Message{
|
|
|
- Channel: chnum,
|
|
|
- Payload: payload,
|
|
|
- }
|
|
|
- t.outch <- omsg
|
|
|
-}
|