chan.go 1.74 KB
Newer Older
1 2 3 4
package msgio

import (
	"io"
Jeromy's avatar
Jeromy committed
5
	"sync"
6 7 8 9 10 11 12
)

type Chan struct {
	Buffers   [][]byte
	MsgChan   chan []byte
	ErrChan   chan error
	CloseChan chan bool
Jeromy's avatar
Jeromy committed
13
	BufPool   *sync.Pool
14 15
}

Jeromy's avatar
Jeromy committed
16 17 18 19 20 21 22 23 24
func NewChan(chanSize int) *Chan {
	return &Chan{
		MsgChan:   make(chan []byte, chanSize),
		ErrChan:   make(chan error, 1),
		CloseChan: make(chan bool, 2),
	}
}

func NewChanWithPool(chanSize int, pool *sync.Pool) *Chan {
25 26 27 28
	return &Chan{
		MsgChan:   make(chan []byte, chanSize),
		ErrChan:   make(chan error, 1),
		CloseChan: make(chan bool, 2),
Jeromy's avatar
Jeromy committed
29
		BufPool:   pool,
30 31 32 33 34 35
	}
}

func (s *Chan) ReadFrom(r io.Reader, maxMsgLen int) {
	// new buffer per message
	// if bottleneck, cycle around a set of buffers
Jeromy's avatar
Jeromy committed
36
	mr := NewReader(r, s.BufPool)
Jeromy's avatar
Jeromy committed
37 38 39 40 41 42
	if s.BufPool == nil {
		s.BufPool = new(sync.Pool)
		s.BufPool.New = func() interface{} {
			return make([]byte, maxMsgLen)
		}
	}
43 44
Loop:
	for {
Jeromy's avatar
Jeromy committed
45
		buf, err := mr.ReadMsg()
46 47 48 49 50 51 52 53 54 55 56 57 58
		if err != nil {
			if err == io.EOF {
				break Loop // done
			}

			// unexpected error. tell the client.
			s.ErrChan <- err
			break Loop
		}

		select {
		case <-s.CloseChan:
			break Loop // told we're done
Jeromy's avatar
Jeromy committed
59
		case s.MsgChan <- buf:
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
			// ok seems fine. send it away
		}
	}

	close(s.MsgChan)
	// signal we're done
	s.CloseChan <- true
}

func (s *Chan) WriteTo(w io.Writer) {
	// new buffer per message
	// if bottleneck, cycle around a set of buffers
	mw := NewWriter(w)
Loop:
	for {
		select {
		case <-s.CloseChan:
			break Loop // told we're done

		case msg, ok := <-s.MsgChan:
			if !ok { // chan closed
				break Loop
			}

			if err := mw.WriteMsg(msg); err != nil {
				if err != io.EOF {
					// unexpected error. tell the client.
					s.ErrChan <- err
				}

				break Loop
			}
		}
	}

	// signal we're done
	s.CloseChan <- true
}

func (s *Chan) Close() {
	s.CloseChan <- true
}