transmitter.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package zmostp_go
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/ascii85"
  6. "encoding/binary"
  7. "fmt"
  8. "hash/crc32"
  9. "io"
  10. "sync"
  11. )
  12. type Transmitter struct {
  13. writer io.Writer
  14. inch chan *Message
  15. humanFriendlySending bool
  16. optLock sync.RWMutex
  17. enableCRC bool
  18. }
  19. func NewTransmitter(cfg *TransmitterConfig) *Transmitter {
  20. t := &Transmitter{
  21. writer: cfg.Writer,
  22. inch: make(chan *Message),
  23. humanFriendlySending: cfg.HumanFriendlySending,
  24. optLock: sync.RWMutex{},
  25. enableCRC: cfg.EnableCRC,
  26. }
  27. return t
  28. }
  29. func (t *Transmitter) GetMessageInChannel() chan *Message {
  30. return t.inch
  31. }
  32. func (t *Transmitter) SetHumanFriendOutputOption(b bool) {
  33. defer t.optLock.Unlock()
  34. t.optLock.Lock()
  35. t.humanFriendlySending = b
  36. }
  37. func (t *Transmitter) Run(ctx context.Context) error {
  38. for {
  39. select {
  40. case <-ctx.Done():
  41. return nil
  42. case out, ok := <-t.inch:
  43. if !ok {
  44. return nil
  45. }
  46. err := t.encodeSendMessage(out, t.writer)
  47. if err != nil {
  48. return err
  49. }
  50. }
  51. }
  52. }
  53. func (t *Transmitter) encodeSendMessage(msg *Message, w io.Writer) error {
  54. defer t.optLock.RUnlock()
  55. t.optLock.RLock()
  56. var buf *bytes.Buffer
  57. if t.humanFriendlySending {
  58. buf = bytes.NewBufferString("{\n\t")
  59. } else {
  60. buf = bytes.NewBuffer([]byte{'{'})
  61. }
  62. asc85end := ascii85.NewEncoder(buf)
  63. chnBytes := binary.LittleEndian.AppendUint32([]byte{}, msg.Channel)
  64. _, err := asc85end.Write(chnBytes)
  65. if err != nil {
  66. return fmt.Errorf("failed to encode chnum of message: ascii85 encoder write error: %w", err)
  67. }
  68. err = asc85end.Close()
  69. if err != nil {
  70. return fmt.Errorf("failed to encode chnum of message: ascii85 encoder close error: %w", err)
  71. }
  72. if t.humanFriendlySending {
  73. buf.WriteString("\n\t|\n\t")
  74. } else {
  75. buf.WriteByte('|')
  76. }
  77. asc85end = ascii85.NewEncoder(buf)
  78. _, err = asc85end.Write(msg.Payload)
  79. if err != nil {
  80. return fmt.Errorf("failed to encode payload of message: ascii85 encoder write error: %w", err)
  81. }
  82. err = asc85end.Close()
  83. if err != nil {
  84. return fmt.Errorf("failed to encode payload of message: ascii85 encoder close error: %w", err)
  85. }
  86. if t.enableCRC {
  87. if t.humanFriendlySending {
  88. buf.WriteString("\n\t|\n\t")
  89. } else {
  90. buf.WriteByte('|')
  91. }
  92. crcBytes := make([]byte, 0, len(msg.Payload)+4)
  93. crcBytes = append(crcBytes, chnBytes...)
  94. crcBytes = append(crcBytes, msg.Payload...)
  95. crcChecksum := crc32.ChecksumIEEE(crcBytes)
  96. crcChsBytes := binary.LittleEndian.AppendUint32([]byte{}, crcChecksum)
  97. asc85end = ascii85.NewEncoder(buf)
  98. _, err = asc85end.Write(crcChsBytes)
  99. if err != nil {
  100. return fmt.Errorf("failed to encode checksum of message: ascii85 encoder write error: %w", err)
  101. }
  102. err = asc85end.Close()
  103. if err != nil {
  104. return fmt.Errorf("failed to encode checksum of message: ascii85 encoder close error: %w", err)
  105. }
  106. }
  107. if t.humanFriendlySending {
  108. buf.WriteString("\n}\n")
  109. } else {
  110. buf.WriteByte('}')
  111. }
  112. _, err = w.Write(buf.Bytes())
  113. if err != nil {
  114. return fmt.Errorf("failed to write message to stream: %w", err)
  115. }
  116. return nil
  117. }