chan.go 1.86 KB
Newer Older
1 2 3 4
package msgio

import (
	"io"
Jeromy's avatar
Jeromy committed
5
	"sync"
6 7 8 9 10 11 12
)

type Chan struct {
	Buffers   [][]byte
	MsgChan   chan []byte
	ErrChan   chan error
	CloseChan chan bool
Jeromy's avatar
Jeromy committed
13
	BufPool   *sync.Pool
14 15
}

Jeromy's avatar
Jeromy committed
16 17 18 19 20 21 22 23 24
func NewChan(chanSize int) *Chan {
	return &Chan{
		MsgChan:   make(chan []byte, chanSize),
		ErrChan:   make(chan error, 1),
		CloseChan: make(chan bool, 2),
	}
}

func NewChanWithPool(chanSize int, pool *sync.Pool) *Chan {
25 26 27 28
	return &Chan{
		MsgChan:   make(chan []byte, chanSize),
		ErrChan:   make(chan error, 1),
		CloseChan: make(chan bool, 2),
Jeromy's avatar
Jeromy committed
29
		BufPool:   pool,
30 31 32
	}
}

Jeromy's avatar
Jeromy committed
33
func (s *Chan) getBuffer(size int) []byte {
Jeromy's avatar
Jeromy committed
34
	if s.BufPool == nil {
Jeromy's avatar
Jeromy committed
35 36 37 38 39 40
		return make([]byte, size)
	} else {
		bufi := s.BufPool.Get()
		buf, ok := bufi.([]byte)
		if !ok {
			panic("Got invalid type from sync pool!")
Jeromy's avatar
Jeromy committed
41
		}
Jeromy's avatar
Jeromy committed
42
		return buf
Jeromy's avatar
Jeromy committed
43
	}
Jeromy's avatar
Jeromy committed
44 45 46 47 48 49
}

func (s *Chan) ReadFrom(r io.Reader, maxMsgLen int) {
	// new buffer per message
	// if bottleneck, cycle around a set of buffers
	mr := NewReader(r)
50 51
Loop:
	for {
Jeromy's avatar
Jeromy committed
52 53
		buf := s.getBuffer(maxMsgLen)
		l, err := mr.ReadMsg(buf)
54 55 56 57 58 59 60 61 62 63 64 65 66
		if err != nil {
			if err == io.EOF {
				break Loop // done
			}

			// unexpected error. tell the client.
			s.ErrChan <- err
			break Loop
		}

		select {
		case <-s.CloseChan:
			break Loop // told we're done
Jeromy's avatar
Jeromy committed
67
		case s.MsgChan <- buf[:l]:
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
			// ok seems fine. send it away
		}
	}

	close(s.MsgChan)
	// signal we're done
	s.CloseChan <- true
}

func (s *Chan) WriteTo(w io.Writer) {
	// new buffer per message
	// if bottleneck, cycle around a set of buffers
	mw := NewWriter(w)
Loop:
	for {
		select {
		case <-s.CloseChan:
			break Loop // told we're done

		case msg, ok := <-s.MsgChan:
			if !ok { // chan closed
				break Loop
			}

			if err := mw.WriteMsg(msg); err != nil {
				if err != io.EOF {
					// unexpected error. tell the client.
					s.ErrChan <- err
				}

				break Loop
			}
		}
	}

	// signal we're done
	s.CloseChan <- true
}

func (s *Chan) Close() {
	s.CloseChan <- true
}