Browse Source

2020-05-04 02:10

zry 4 years ago
parent
commit
d1cbb58f6f

+ 2 - 0
.idea/.gitignore

@@ -0,0 +1,2 @@
+# Default ignored files
+/workspace.xml

+ 5 - 0
.idea/inspectionProfiles/profiles_settings.xml

@@ -0,0 +1,5 @@
+<component name="InspectionProjectProfileManager">
+  <settings>
+    <option name="PROJECT_PROFILE" />
+  </settings>
+</component>

+ 6 - 0
.idea/misc.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="JavaScriptSettings">
+    <option name="languageLevel" value="ES6" />
+  </component>
+</project>

+ 8 - 0
.idea/modules.xml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectModuleManager">
+    <modules>
+      <module fileurl="file://$PROJECT_DIR$/.idea/zioutil.iml" filepath="$PROJECT_DIR$/.idea/zioutil.iml" />
+    </modules>
+  </component>
+</project>

+ 6 - 0
.idea/vcs.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$" vcs="Git" />
+  </component>
+</project>

+ 8 - 0
.idea/zioutil.iml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="WEB_MODULE" version="4">
+  <component name="NewModuleRootManager">
+    <content url="file://$MODULE_DIR$" />
+    <orderEntry type="inheritedJdk" />
+    <orderEntry type="sourceFolder" forTests="false" />
+  </component>
+</module>

+ 145 - 0
batch_transporter.go

@@ -0,0 +1,145 @@
+package zioutil
+
+// Inspired by transport implementation of buptczq/gotun.
+// Some code was copied from buptczq/gotun.
+
+import (
+	"context"
+	"io"
+	"strings"
+	"sync"
+	"time"
+)
+
+type BatchTransporterGlobal struct {
+	bufferPool sync.Pool
+}
+
+func NewBatchTransporterGlobal(bufsize int) *BatchTransporterGlobal {
+	return &BatchTransporterGlobal{
+		bufferPool: sync.Pool{New: func() interface{} {
+			return make([]byte, bufsize)
+		}},
+	}
+}
+
+type BatchTransporter struct {
+	btg                    *BatchTransporterGlobal
+	rwc1, rwc2             io.ReadWriteCloser
+	ReadTime12, ReadTime21 time.Time
+	Error12, Error21       error
+	Bytes12, Bytes21       int64
+	Start, Stop            time.Time
+}
+
+func NewBatchTrasporter(global *BatchTransporterGlobal, start time.Time, rwc1, rwc2 io.ReadWriteCloser) *BatchTransporter {
+	return &BatchTransporter{
+		btg:        global,
+		rwc1:       rwc1,
+		rwc2:       rwc2,
+		ReadTime12: time.Now(),
+		ReadTime21: time.Now(),
+		Start:      start,
+	}
+}
+
+func (bt *BatchTransporter) copyBuffer(ctx context.Context, dst io.Writer, src io.Reader, buf []byte, is12 bool) (cnt int64, err error) {
+	nftc := false
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+		}
+		nr, errr := src.Read(buf)
+		if !nftc {
+			nftc = true
+			if is12 {
+				bt.ReadTime12 = time.Now()
+			} else {
+				bt.ReadTime21 = time.Now()
+			}
+		}
+		if nr > 0 {
+			nw, errw := dst.Write(buf[0:nr])
+			if nw > 0 {
+				cnt += int64(nw)
+			}
+			if errw != nil {
+				err = errw
+				break
+			}
+			if nr != nw {
+				err = io.ErrShortWrite
+				break
+			}
+		}
+		if errr != nil {
+			if errr != io.EOF {
+				err = errr
+			}
+			break
+		}
+	}
+	return cnt, err
+}
+
+func (bt *BatchTransporter) Copy() {
+	ctx, cancel := context.WithCancel(context.Background())
+	wg := &sync.WaitGroup{}
+	wg.Add(2)
+	go func() {
+		defer wg.Done()
+		buf := bt.btg.bufferPool.Get().([]byte)
+		defer bt.btg.bufferPool.Put(buf)
+		bt.Bytes12, bt.Error12 = bt.copyBuffer(ctx, bt.rwc2, bt.rwc1, buf, true)
+		cancel()
+		bt.rwc2.Close()
+	}()
+	go func() {
+		defer wg.Done()
+		buf := bt.btg.bufferPool.Get().([]byte)
+		defer bt.btg.bufferPool.Put(buf)
+		bt.Bytes21, bt.Error21 = bt.copyBuffer(ctx, bt.rwc1, bt.rwc2, buf, false)
+		cancel()
+		bt.rwc1.Close()
+	}()
+	wg.Wait()
+	bt.Stop = time.Now()
+}
+
+func filterNetworkConnClosedError(err error) error {
+	if err == nil {
+		return nil
+	}
+	if strings.Contains(err.Error(), "use of closed network connection") {
+		return nil
+	}
+	return err
+}
+
+func (bt *BatchTransporter) NoNetworkConnClosedError() error {
+	err12 := filterNetworkConnClosedError(bt.Error12)
+	err21 := filterNetworkConnClosedError(bt.Error21)
+	if err12 == nil {
+		err12 = err21
+	}
+	return err12
+}
+
+func (bt *BatchTransporter) PickError() error {
+	err12 := bt.Error12
+	err21 := bt.Error21
+	if err12 == nil {
+		err12 = err21
+	}
+	return err12
+}
+
+func (bt *BatchTransporter) GetError12() error {
+	return bt.Error12
+}
+
+func (bt *BatchTransporter) GetError21() error {
+	return bt.Error21
+}

+ 65 - 0
reader_writer.go

@@ -0,0 +1,65 @@
+package zioutil
+
+import "io"
+
+type ReadWriterBind struct {
+	Reader io.Reader
+	Writer io.Writer
+}
+
+func (rwb ReadWriterBind) Read(b []byte) (n int, err error) {
+	return rwb.Reader.Read(b)
+}
+
+func (rwb ReadWriterBind) Write(b []byte) (n int, err error) {
+	return rwb.Writer.Write(b)
+}
+
+type ReadWriteCloserBind struct {
+	ReadCloser  io.ReadCloser
+	WriteCloser io.WriteCloser
+}
+
+func (rwb ReadWriteCloserBind) Read(b []byte) (n int, err error) {
+	return rwb.ReadCloser.Read(b)
+}
+
+func (rwb ReadWriteCloserBind) Write(b []byte) (n int, err error) {
+	return rwb.WriteCloser.Write(b)
+}
+
+func (rwb ReadWriteCloserBind) Close() error {
+	e1 := rwb.ReadCloser.Close()
+	e2 := rwb.WriteCloser.Close()
+	if e1 != nil {
+		return e1
+	}
+	if e2 != nil {
+		return e2
+	}
+	return nil
+}
+
+type Reader2ReadCloser struct {
+	Reader io.Reader
+}
+
+func (rcb Reader2ReadCloser) Read(b []byte) (n int, err error) {
+	return rcb.Reader.Read(b)
+}
+
+func (rcb Reader2ReadCloser) Close() error {
+	return nil
+}
+
+type Writer2ReadCloser struct {
+	Writer io.Writer
+}
+
+func (wcb Writer2ReadCloser) Write(b []byte) (n int, err error) {
+	return wcb.Writer.Write(b)
+}
+
+func (wcb Writer2ReadCloser) Close() error {
+	return nil
+}

+ 40 - 0
slow_copy.go

@@ -0,0 +1,40 @@
+package zioutil
+
+import (
+	"io"
+	"sync"
+)
+
+func SlowCopy(w io.Writer, r io.Reader, stepsize int) (n int, err error) {
+	rbuf := make([]byte, stepsize)
+	cnt := 0
+	for {
+		n, err := r.Read(rbuf)
+		if n == 0 {
+			continue
+		}
+		if err != nil {
+			return cnt, err
+		}
+		n, err = w.Write(rbuf)
+		if err != nil {
+			return cnt, err
+		}
+		cnt += n
+	}
+}
+
+func SlowBidiCopy(rw1, rw2 io.ReadWriter, stepsize12, stepsize21 int) (n12, n21 int, err12, err21 error) {
+	wg := sync.WaitGroup{}
+	wg.Add(2)
+	go func() {
+		n12, err12 = SlowCopy(rw2, rw1, stepsize12)
+		wg.Done()
+	}()
+	go func() {
+		n21, err21 = SlowCopy(rw1, rw2, stepsize21)
+		wg.Done()
+	}()
+	wg.Wait()
+	return
+}