123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- package zmostp_go
- import (
- "bytes"
- "context"
- "encoding/ascii85"
- "encoding/binary"
- "fmt"
- "hash/crc32"
- "io"
- "sync"
- )
- type Transporter struct {
- reader io.ByteReader
- writer io.Writer
- outch chan *Message
- inch chan *Message
- humanFriendlySending bool
- optLock sync.RWMutex
- dfa *DecoderDFA
- decodeFailureLogHdl DecodeFailureLogHandler
- enableCRC bool
- }
- func NewTransporter(cfg *TransporterConfig) *Transporter {
- t := &Transporter{
- reader: cfg.Reader,
- writer: cfg.Writer,
- outch: make(chan *Message),
- inch: make(chan *Message),
- humanFriendlySending: cfg.HumanFriendlySending,
- optLock: sync.RWMutex{},
- enableCRC: cfg.EnableCRC,
- }
- if cfg.DecodeFailureLog == nil {
- t.decodeFailureLogHdl = &EmptyDecodeFailureLogHandler{}
- t.dfa = NewDecoderDFA(&EmptyDecodeFailureLogHandler{})
- } else {
- t.decodeFailureLogHdl = cfg.DecodeFailureLog
- t.dfa = NewDecoderDFA(cfg.DecodeFailureLog)
- }
- return t
- }
- func (t *Transporter) GetMessageInChannel() chan *Message {
- return t.inch
- }
- func (t *Transporter) GetMessageOutChannel() chan *Message {
- return t.outch
- }
- func (t *Transporter) Run(ctx context.Context) error {
- t.dfa.Reset()
- var errSend, errRecv, errPipeCopy error
- wg := &sync.WaitGroup{}
- wg.Add(2)
- pctx, cncl := context.WithCancel(ctx)
- go func() {
- errSend = t.doSend(pctx)
- cncl()
- wg.Done()
- }()
- go func() {
- errRecv = t.doRecv(pctx)
- cncl()
- wg.Done()
- }()
- wg.Wait()
- if errSend != nil {
- return errSend
- }
- if errRecv != nil {
- return errRecv
- }
- if errPipeCopy != nil {
- if errPipeCopy == io.EOF {
- return nil
- }
- return errPipeCopy
- }
- return nil
- }
- func (t *Transporter) doRecv(ctx context.Context) error {
- recvErrCh := make(chan error)
- recvByteCh := make(chan byte)
- dfaOutCh := t.dfa.GetOutChannel()
- go func() {
- for {
- b, err := t.reader.ReadByte()
- if err != nil {
- if err == io.EOF {
- recvErrCh <- nil
- return
- } else {
- recvErrCh <- err
- return
- }
- }
- recvByteCh <- b
- }
- }()
- for {
- select {
- case <-ctx.Done():
- close(recvErrCh)
- close(recvByteCh)
- close(t.outch)
- t.dfa.Shutdown()
- return nil
- case e, ok := <-recvErrCh:
- if !ok {
- return nil
- }
- return fmt.Errorf("failed recv from stream: %w", e)
- case b, ok := <-recvByteCh:
- if !ok {
- return nil
- }
- t.dfa.PushByte(b)
- case dom, ok := <-dfaOutCh:
- if !ok {
- return nil
- }
- t.decodeMessageFromDFAOut(dom)
- }
- }
- }
- func (t *Transporter) doSend(ctx context.Context) error {
- for {
- select {
- case <-ctx.Done():
- return nil
- case out, ok := <-t.inch:
- if !ok {
- return nil
- }
- err := t.encodeSendMessage(out, t.writer)
- if err != nil {
- return err
- }
- }
- }
- }
- func (t *Transporter) SetHumanFriendOutputOption(b bool) {
- defer t.optLock.Unlock()
- t.optLock.Lock()
- t.humanFriendlySending = b
- }
- func (t *Transporter) encodeSendMessage(msg *Message, w io.Writer) error {
- defer t.optLock.RUnlock()
- t.optLock.RLock()
- var buf *bytes.Buffer
- if t.humanFriendlySending {
- buf = bytes.NewBufferString("{\n\t")
- } else {
- buf = bytes.NewBuffer([]byte{'{'})
- }
- asc85end := ascii85.NewEncoder(buf)
- chnBytes := binary.LittleEndian.AppendUint32([]byte{}, msg.Channel)
- _, err := asc85end.Write(chnBytes)
- if err != nil {
- return fmt.Errorf("failed to encode chnum of message: ascii85 encoder write error: %w", err)
- }
- err = asc85end.Close()
- if err != nil {
- return fmt.Errorf("failed to encode chnum of message: ascii85 encoder close error: %w", err)
- }
- if t.humanFriendlySending {
- buf.WriteString("\n\t|\n\t")
- } else {
- buf.WriteByte('|')
- }
- asc85end = ascii85.NewEncoder(buf)
- _, err = asc85end.Write(msg.Payload)
- if err != nil {
- return fmt.Errorf("failed to encode payload of message: ascii85 encoder write error: %w", err)
- }
- err = asc85end.Close()
- if err != nil {
- return fmt.Errorf("failed to encode payload of message: ascii85 encoder close error: %w", err)
- }
- if t.enableCRC {
- if t.humanFriendlySending {
- buf.WriteString("\n\t|\n\t")
- } else {
- buf.WriteByte('|')
- }
- crcBytes := make([]byte, 0, len(msg.Payload)+4)
- crcBytes = append(crcBytes, chnBytes...)
- crcBytes = append(crcBytes, msg.Payload...)
- crcChecksum := crc32.ChecksumIEEE(crcBytes)
- crcChsBytes := binary.LittleEndian.AppendUint32([]byte{}, crcChecksum)
- asc85end = ascii85.NewEncoder(buf)
- _, err = asc85end.Write(crcChsBytes)
- if err != nil {
- return fmt.Errorf("failed to encode checksum of message: ascii85 encoder write error: %w", err)
- }
- err = asc85end.Close()
- if err != nil {
- return fmt.Errorf("failed to encode checksum of message: ascii85 encoder close error: %w", err)
- }
- }
- if t.humanFriendlySending {
- buf.WriteString("\n}\n")
- } else {
- buf.WriteByte('}')
- }
- _, err = w.Write(buf.Bytes())
- if err != nil {
- return fmt.Errorf("failed to write message to stream: %w", err)
- }
- return nil
- }
- func (t *Transporter) decodeMessageFromDFAOut(msg *DFAOutMsg) {
- decChannel := ascii85.NewDecoder(bytes.NewReader(msg.RawChannel))
- decPayload := ascii85.NewDecoder(bytes.NewReader(msg.RawPayload))
- chbyte, err := io.ReadAll(decChannel)
- if err != nil {
- t.decodeFailureLogHdl.DecodeChannelAscii85Failed(msg.RawChannel, err)
- return
- }
- chnum := binary.LittleEndian.Uint32(chbyte)
- payload, err := io.ReadAll(decPayload)
- if err != nil {
- t.decodeFailureLogHdl.DecodePayloadAscii85Failed(chnum, msg.RawPayload, err)
- return
- }
- if t.enableCRC {
- if len(msg.Checksum) <= 0 {
- t.decodeFailureLogHdl.NeedCRCChecksum()
- return
- }
- decCRC := ascii85.NewDecoder(bytes.NewReader(msg.Checksum))
- crcBytes, err := io.ReadAll(decCRC)
- if err != nil {
- t.decodeFailureLogHdl.DecodeChecksumAscii85Failed(chnum, msg.RawPayload, err)
- return
- }
- crcNum := binary.LittleEndian.Uint32(crcBytes)
- crcABytes := make([]byte, 0, len(payload)+4)
- crcABytes = append(crcABytes, chbyte...)
- crcABytes = append(crcABytes, payload...)
- crcAChecksum := crc32.ChecksumIEEE(crcABytes)
- if crcNum != crcAChecksum {
- t.decodeFailureLogHdl.CRCChecksumMismached(crcAChecksum, crcNum)
- return
- }
- }
- omsg := &Message{
- Channel: chnum,
- Payload: payload,
- }
- t.outch <- omsg
- }
|