123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- package zmostp_go
- import (
- "bytes"
- "context"
- "encoding/ascii85"
- "encoding/binary"
- "fmt"
- "hash/crc32"
- "io"
- "sync"
- )
- type Transmitter struct {
- writer io.Writer
- inch chan *Message
- humanFriendlySending bool
- optLock sync.RWMutex
- enableCRC bool
- }
- func NewTransmitter(cfg *TransmitterConfig) *Transmitter {
- t := &Transmitter{
- writer: cfg.Writer,
- inch: make(chan *Message),
- humanFriendlySending: cfg.HumanFriendlySending,
- optLock: sync.RWMutex{},
- enableCRC: cfg.EnableCRC,
- }
- return t
- }
- func (t *Transmitter) GetMessageInChannel() chan *Message {
- return t.inch
- }
- func (t *Transmitter) SetHumanFriendOutputOption(b bool) {
- defer t.optLock.Unlock()
- t.optLock.Lock()
- t.humanFriendlySending = b
- }
- func (t *Transmitter) Run(ctx context.Context) error {
- for {
- select {
- case <-ctx.Done():
- return nil
- case out, ok := <-t.inch:
- if !ok {
- return nil
- }
- err := t.encodeSendMessage(out, t.writer)
- if err != nil {
- return err
- }
- }
- }
- }
- func (t *Transmitter) encodeSendMessage(msg *Message, w io.Writer) error {
- defer t.optLock.RUnlock()
- t.optLock.RLock()
- var buf *bytes.Buffer
- if t.humanFriendlySending {
- buf = bytes.NewBufferString("{\n\t")
- } else {
- buf = bytes.NewBuffer([]byte{'{'})
- }
- asc85end := ascii85.NewEncoder(buf)
- chnBytes := binary.LittleEndian.AppendUint32([]byte{}, msg.Channel)
- _, err := asc85end.Write(chnBytes)
- if err != nil {
- return fmt.Errorf("failed to encode chnum of message: ascii85 encoder write error: %w", err)
- }
- err = asc85end.Close()
- if err != nil {
- return fmt.Errorf("failed to encode chnum of message: ascii85 encoder close error: %w", err)
- }
- if t.humanFriendlySending {
- buf.WriteString("\n\t|\n\t")
- } else {
- buf.WriteByte('|')
- }
- asc85end = ascii85.NewEncoder(buf)
- _, err = asc85end.Write(msg.Payload)
- if err != nil {
- return fmt.Errorf("failed to encode payload of message: ascii85 encoder write error: %w", err)
- }
- err = asc85end.Close()
- if err != nil {
- return fmt.Errorf("failed to encode payload of message: ascii85 encoder close error: %w", err)
- }
- if t.enableCRC {
- if t.humanFriendlySending {
- buf.WriteString("\n\t|\n\t")
- } else {
- buf.WriteByte('|')
- }
- crcBytes := make([]byte, 0, len(msg.Payload)+4)
- crcBytes = append(crcBytes, chnBytes...)
- crcBytes = append(crcBytes, msg.Payload...)
- crcChecksum := crc32.ChecksumIEEE(crcBytes)
- crcChsBytes := binary.LittleEndian.AppendUint32([]byte{}, crcChecksum)
- asc85end = ascii85.NewEncoder(buf)
- _, err = asc85end.Write(crcChsBytes)
- if err != nil {
- return fmt.Errorf("failed to encode checksum of message: ascii85 encoder write error: %w", err)
- }
- err = asc85end.Close()
- if err != nil {
- return fmt.Errorf("failed to encode checksum of message: ascii85 encoder close error: %w", err)
- }
- }
- if t.humanFriendlySending {
- buf.WriteString("\n}\n")
- } else {
- buf.WriteByte('}')
- }
- _, err = w.Write(buf.Bytes())
- if err != nil {
- return fmt.Errorf("failed to write message to stream: %w", err)
- }
- return nil
- }
|