ringbuffer.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. package backable_ringbuffer
  2. import (
  3. "errors"
  4. "unsafe"
  5. )
  6. var (
  7. ErrTooManyDataToWrite = errors.New("too many data to write")
  8. ErrIsFull = errors.New("ringbuffer is full")
  9. ErrIsEmpty = errors.New("ringbuffer is empty")
  10. ErrOutOfRange = errors.New("index out of range")
  11. ErrAccuqireLock = errors.New("no lock to accquire")
  12. )
  13. type BackableRingBuffer struct {
  14. buf []byte
  15. size int
  16. r int // next position to read
  17. w int // next position to write
  18. isFull bool
  19. mu Mutex
  20. }
  21. func NewBackableRingBuffer(size int) *BackableRingBuffer {
  22. b := &BackableRingBuffer{
  23. size: size,
  24. buf: make([]byte, size),
  25. }
  26. return b
  27. }
  28. func (r *BackableRingBuffer) Read(p []byte) (n int, err error) {
  29. if len(p) == 0 {
  30. return 0, nil
  31. }
  32. r.mu.Lock()
  33. n, err = r.read(p)
  34. r.mu.Unlock()
  35. return n, err
  36. }
  37. func (r *BackableRingBuffer) Peep(p []byte) (n int, err error) {
  38. if len(p) == 0 {
  39. return 0, nil
  40. }
  41. r.mu.Lock()
  42. n, err = r.peep(p)
  43. r.mu.Unlock()
  44. return n, err
  45. }
  46. func (r *BackableRingBuffer) TryRead(p []byte) (n int, err error) {
  47. if len(p) == 0 {
  48. return 0, nil
  49. }
  50. ok := r.mu.TryLock()
  51. if !ok {
  52. return 0, ErrAccuqireLock
  53. }
  54. n, err = r.read(p)
  55. r.mu.Unlock()
  56. return n, err
  57. }
  58. func (r *BackableRingBuffer) TryPeep(p []byte) (n int, err error) {
  59. if len(p) == 0 {
  60. return 0, nil
  61. }
  62. ok := r.mu.TryLock()
  63. if !ok {
  64. return 0, ErrAccuqireLock
  65. }
  66. n, err = r.peep(p)
  67. r.mu.Unlock()
  68. return n, err
  69. }
  70. func (r *BackableRingBuffer) read(p []byte) (n int, err error) {
  71. if r.w == r.r && !r.isFull {
  72. return 0, ErrIsEmpty
  73. }
  74. if r.w > r.r {
  75. n = r.w - r.r
  76. if n > len(p) {
  77. n = len(p)
  78. }
  79. copy(p, r.buf[r.r:r.r+n])
  80. r.r = (r.r + n) % r.size
  81. return
  82. }
  83. n = r.size - r.r + r.w
  84. if n > len(p) {
  85. n = len(p)
  86. }
  87. if r.r+n <= r.size {
  88. copy(p, r.buf[r.r:r.r+n])
  89. } else {
  90. c1 := r.size - r.r
  91. copy(p, r.buf[r.r:r.size])
  92. c2 := n - c1
  93. copy(p[c1:], r.buf[0:c2])
  94. }
  95. r.r = (r.r + n) % r.size
  96. r.isFull = false
  97. return n, err
  98. }
  99. func (r *BackableRingBuffer) peep(p []byte) (n int, err error) {
  100. if r.w == r.r && !r.isFull {
  101. return 0, ErrIsEmpty
  102. }
  103. if r.w > r.r {
  104. n = r.w - r.r
  105. if n > len(p) {
  106. n = len(p)
  107. }
  108. copy(p, r.buf[r.r:r.r+n])
  109. return
  110. }
  111. n = r.size - r.r + r.w
  112. if n > len(p) {
  113. n = len(p)
  114. }
  115. if r.r+n <= r.size {
  116. copy(p, r.buf[r.r:r.r+n])
  117. } else {
  118. c1 := r.size - r.r
  119. copy(p, r.buf[r.r:r.size])
  120. c2 := n - c1
  121. copy(p[c1:], r.buf[0:c2])
  122. }
  123. return n, err
  124. }
  125. func (r *BackableRingBuffer) ReadByte() (b byte, err error) {
  126. r.mu.Lock()
  127. if r.w == r.r && !r.isFull {
  128. r.mu.Unlock()
  129. return 0, ErrIsEmpty
  130. }
  131. b = r.buf[r.r]
  132. r.r++
  133. if r.r == r.size {
  134. r.r = 0
  135. }
  136. r.isFull = false
  137. r.mu.Unlock()
  138. return b, err
  139. }
  140. func (r *BackableRingBuffer) PeepByte() (b byte, err error) {
  141. r.mu.Lock()
  142. if r.w == r.r && !r.isFull {
  143. r.mu.Unlock()
  144. return 0, ErrIsEmpty
  145. }
  146. b = r.buf[r.r]
  147. r.mu.Unlock()
  148. return b, err
  149. }
  150. func (r *BackableRingBuffer) Write(p []byte) (n int, err error) {
  151. if len(p) == 0 {
  152. return 0, nil
  153. }
  154. r.mu.Lock()
  155. n, err = r.write(p)
  156. r.mu.Unlock()
  157. return n, err
  158. }
  159. func (r *BackableRingBuffer) WriteBack(p []byte) (n int, err error) {
  160. if len(p) == 0 {
  161. return 0, nil
  162. }
  163. r.mu.Lock()
  164. n, err = r.writeBack(p)
  165. r.mu.Unlock()
  166. return n, err
  167. }
  168. func (r *BackableRingBuffer) TryWrite(p []byte) (n int, err error) {
  169. if len(p) == 0 {
  170. return 0, nil
  171. }
  172. ok := r.mu.TryLock()
  173. if !ok {
  174. return 0, ErrAccuqireLock
  175. }
  176. n, err = r.write(p)
  177. r.mu.Unlock()
  178. return n, err
  179. }
  180. func (r *BackableRingBuffer) TryWriteBack(p []byte) (n int, err error) {
  181. if len(p) == 0 {
  182. return 0, nil
  183. }
  184. ok := r.mu.TryLock()
  185. if !ok {
  186. return 0, ErrAccuqireLock
  187. }
  188. n, err = r.writeBack(p)
  189. r.mu.Unlock()
  190. return n, err
  191. }
  192. func (r *BackableRingBuffer) write(p []byte) (n int, err error) {
  193. if r.isFull {
  194. return 0, ErrIsFull
  195. }
  196. var avail int
  197. if r.w >= r.r {
  198. avail = r.size - r.w + r.r
  199. } else {
  200. avail = r.r - r.w
  201. }
  202. if len(p) > avail {
  203. err = ErrTooManyDataToWrite
  204. p = p[:avail]
  205. }
  206. n = len(p)
  207. if r.w >= r.r {
  208. c1 := r.size - r.w
  209. if c1 >= n {
  210. copy(r.buf[r.w:], p)
  211. r.w += n
  212. } else {
  213. copy(r.buf[r.w:], p[:c1])
  214. c2 := n - c1
  215. copy(r.buf[0:], p[c1:])
  216. r.w = c2
  217. }
  218. } else {
  219. copy(r.buf[r.w:], p)
  220. r.w += n
  221. }
  222. if r.w == r.size {
  223. r.w = 0
  224. }
  225. if r.w == r.r {
  226. r.isFull = true
  227. }
  228. return n, err
  229. }
  230. func (r *BackableRingBuffer) writeBack(p []byte) (n int, err error) {
  231. if r.isFull {
  232. return 0, ErrIsFull
  233. }
  234. var avail int
  235. if r.w >= r.r {
  236. avail = r.size - r.w + r.r
  237. } else {
  238. avail = r.r - r.w
  239. }
  240. if len(p) > avail {
  241. err = ErrTooManyDataToWrite
  242. p = p[:avail]
  243. }
  244. n = len(p)
  245. if r.w >= r.r {
  246. if n <= r.r {
  247. stp := r.r - n
  248. copy(r.buf[stp:], p)
  249. r.r = r.r - n
  250. } else {
  251. lan := n - r.r
  252. stp1 := r.size - lan
  253. copy(r.buf[stp1:], p[:lan])
  254. copy(r.buf, p[lan:])
  255. r.r = stp1
  256. }
  257. } else {
  258. stp2 := r.r - n
  259. copy(r.buf[stp2:], p)
  260. r.r = stp2
  261. }
  262. if r.w == r.r {
  263. r.isFull = true
  264. }
  265. return n, err
  266. }
  267. func (r *BackableRingBuffer) WriteByte(c byte) error {
  268. r.mu.Lock()
  269. err := r.writeByte(c)
  270. r.mu.Unlock()
  271. return err
  272. }
  273. func (r *BackableRingBuffer) WriteByteBack(c byte) error {
  274. r.mu.Lock()
  275. err := r.writeByteBack(c)
  276. r.mu.Unlock()
  277. return err
  278. }
  279. func (r *BackableRingBuffer) TryWriteByte(c byte) error {
  280. ok := r.mu.TryLock()
  281. if !ok {
  282. return ErrAccuqireLock
  283. }
  284. err := r.writeByte(c)
  285. r.mu.Unlock()
  286. return err
  287. }
  288. func (r *BackableRingBuffer) TryWriteByteBack(c byte) error {
  289. ok := r.mu.TryLock()
  290. if !ok {
  291. return ErrAccuqireLock
  292. }
  293. err := r.writeByteBack(c)
  294. r.mu.Unlock()
  295. return err
  296. }
  297. func (r *BackableRingBuffer) writeByte(c byte) error {
  298. if r.w == r.r && r.isFull {
  299. return ErrIsFull
  300. }
  301. r.buf[r.w] = c
  302. r.w++
  303. if r.w == r.size {
  304. r.w = 0
  305. }
  306. if r.w == r.r {
  307. r.isFull = true
  308. }
  309. return nil
  310. }
  311. func (r *BackableRingBuffer) writeByteBack(c byte) error {
  312. if r.w == r.r && r.isFull {
  313. return ErrIsFull
  314. }
  315. pos := r.r - 1
  316. if pos < 0 {
  317. pos = r.size - 1
  318. }
  319. r.buf[pos] = c
  320. r.r = pos
  321. if r.w == r.r {
  322. r.isFull = true
  323. }
  324. return nil
  325. }
  326. func (r *BackableRingBuffer) Length() int {
  327. r.mu.Lock()
  328. defer r.mu.Unlock()
  329. if r.w == r.r {
  330. if r.isFull {
  331. return r.size
  332. }
  333. return 0
  334. }
  335. if r.w > r.r {
  336. return r.w - r.r
  337. }
  338. return r.size - r.r + r.w
  339. }
  340. func (r *BackableRingBuffer) Capacity() int {
  341. return r.size
  342. }
  343. func (r *BackableRingBuffer) Free() int {
  344. r.mu.Lock()
  345. defer r.mu.Unlock()
  346. if r.w == r.r {
  347. if r.isFull {
  348. return 0
  349. }
  350. return r.size
  351. }
  352. if r.w < r.r {
  353. return r.r - r.w
  354. }
  355. return r.size - r.w + r.r
  356. }
  357. func (r *BackableRingBuffer) WriteString(s string) (n int, err error) {
  358. x := (*[2]uintptr)(unsafe.Pointer(&s))
  359. h := [3]uintptr{x[0], x[1], x[1]}
  360. buf := *(*[]byte)(unsafe.Pointer(&h))
  361. return r.Write(buf)
  362. }
  363. func (r *BackableRingBuffer) Bytes() []byte {
  364. r.mu.Lock()
  365. defer r.mu.Unlock()
  366. if r.w == r.r {
  367. if r.isFull {
  368. buf := make([]byte, r.size)
  369. copy(buf, r.buf[r.r:])
  370. copy(buf[r.size-r.r:], r.buf[:r.w])
  371. return buf
  372. }
  373. return nil
  374. }
  375. if r.w > r.r {
  376. buf := make([]byte, r.w-r.r)
  377. copy(buf, r.buf[r.r:r.w])
  378. return buf
  379. }
  380. n := r.size - r.r + r.w
  381. buf := make([]byte, n)
  382. if r.r+n < r.size {
  383. copy(buf, r.buf[r.r:r.r+n])
  384. } else {
  385. c1 := r.size - r.r
  386. copy(buf, r.buf[r.r:r.size])
  387. c2 := n - c1
  388. copy(buf[c1:], r.buf[0:c2])
  389. }
  390. return buf
  391. }
  392. func (r *BackableRingBuffer) PeepByteAt(pos int) (b byte, err error) {
  393. defer r.mu.Unlock()
  394. r.mu.Lock()
  395. if r.w == r.r && !r.isFull {
  396. return 0, ErrIsEmpty
  397. }
  398. if r.r == r.w && pos > 0 {
  399. return 0, ErrOutOfRange
  400. }
  401. if r.r < r.w {
  402. rpos := r.r + pos
  403. b = r.buf[rpos]
  404. return
  405. } else {
  406. if r.r+pos >= r.size {
  407. rpos := r.r + pos - r.size
  408. if rpos >= r.w {
  409. return 0, ErrOutOfRange
  410. }
  411. b = r.buf[rpos]
  412. return
  413. } else {
  414. rpos := r.r + pos
  415. b = r.buf[rpos]
  416. return
  417. }
  418. }
  419. }
  420. func (r *BackableRingBuffer) IsFull() bool {
  421. r.mu.Lock()
  422. defer r.mu.Unlock()
  423. return r.isFull
  424. }
  425. func (r *BackableRingBuffer) IsEmpty() bool {
  426. r.mu.Lock()
  427. defer r.mu.Unlock()
  428. return !r.isFull && r.w == r.r
  429. }
  430. func (r *BackableRingBuffer) Reset() {
  431. r.mu.Lock()
  432. defer r.mu.Unlock()
  433. r.r = 0
  434. r.w = 0
  435. r.isFull = false
  436. }