worker.go 4.17 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
// TODO FIXME name me
package worker

import (
	"container/list"
	"errors"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
11
	ratelimit "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
	blocks "github.com/jbenet/go-ipfs/blocks"
	exchange "github.com/jbenet/go-ipfs/exchange"
	util "github.com/jbenet/go-ipfs/util"
)

var log = util.Logger("blockservice")

var DefaultConfig = Config{
	NumWorkers:       1,
	ClientBufferSize: 0,
	WorkerBufferSize: 0,
}

type Config struct {
	// NumWorkers sets the number of background workers that provide blocks to
	// the exchange.
	NumWorkers int

	// ClientBufferSize allows clients of HasBlock to send up to
	// |ClientBufferSize| blocks without blocking.
	ClientBufferSize int

	// WorkerBufferSize can be used in conjunction with NumWorkers to reduce
	// communication-coordination within the worker.
	WorkerBufferSize int
}

// TODO FIXME name me
type Worker struct {
	// added accepts blocks from client
	added    chan *blocks.Block
	exchange exchange.Interface

	// workQueue is owned by the client worker
	// process manages life-cycle
	process process.Process
}

func NewWorker(e exchange.Interface, c Config) *Worker {
	if c.NumWorkers < 1 {
		c.NumWorkers = 1 // provide a sane default
	}
	w := &Worker{
		exchange: e,
		added:    make(chan *blocks.Block, c.ClientBufferSize),
		process:  process.WithParent(process.Background()), // internal management
	}
	w.start(c)
	return w
}

func (w *Worker) HasBlock(b *blocks.Block) error {
	select {
	case <-w.process.Closed():
		return errors.New("blockservice worker is closed")
	case w.added <- b:
		return nil
	}
}

func (w *Worker) Close() error {
	log.Debug("blockservice provide worker is shutting down...")
	return w.process.Close()
}

func (w *Worker) start(c Config) {

	workerChan := make(chan *blocks.Block, c.WorkerBufferSize)

	// clientWorker handles incoming blocks from |w.added| and sends to
	// |workerChan|. This will never block the client.
	w.process.Go(func(proc process.Process) {
		defer close(workerChan)

		var workQueue BlockList
87 88
		debugInfo := time.NewTicker(5 * time.Second)
		defer debugInfo.Stop()
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
		for {

			// take advantage of the fact that sending on nil channel always
			// blocks so that a message is only sent if a block exists
			sendToWorker := workerChan
			nextBlock := workQueue.Pop()
			if nextBlock == nil {
				sendToWorker = nil
			}

			select {

			// if worker is ready and there's a block to process, send the
			// block
			case sendToWorker <- nextBlock:
104
			case <-debugInfo.C:
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
				if workQueue.Len() > 0 {
					log.Debugf("%d blocks in blockservice provide queue...", workQueue.Len())
				}
			case block := <-w.added:
				if nextBlock != nil {
					workQueue.Push(nextBlock) // missed the chance to send it
				}
				// if the client sends another block, add it to the queue.
				workQueue.Push(block)
			case <-proc.Closing():
				return
			}
		}
	})

Brian Tiger Chow's avatar
Brian Tiger Chow committed
120 121
	// reads from |workerChan| until process closes
	w.process.Go(func(proc process.Process) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
122
		ctx := childContext(proc) // shut down in-progress HasBlock when time to die
Brian Tiger Chow's avatar
Brian Tiger Chow committed
123
		limiter := ratelimit.NewRateLimiter(proc, c.NumWorkers)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
124 125 126 127 128 129
		for {
			select {
			case <-proc.Closing():
				return
			case block, ok := <-workerChan:
				if !ok {
130
					return
Brian Tiger Chow's avatar
Brian Tiger Chow committed
131
				}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
132 133
				limiter.LimitedGo(func(proc process.Process) {
					if err := w.exchange.HasBlock(ctx, block); err != nil {
Brian Tiger Chow's avatar
log err  
Brian Tiger Chow committed
134
						log.Infof("blockservice worker error: %s", err)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
135 136
					}
				})
137
			}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
138 139
		}
	})
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
}

type BlockList struct {
	list list.List
}

func (s *BlockList) PushFront(b *blocks.Block) {
	s.list.PushFront(b)
}

func (s *BlockList) Push(b *blocks.Block) {
	s.list.PushBack(b)
}

func (s *BlockList) Pop() *blocks.Block {
	if s.list.Len() == 0 {
		return nil
	}
	e := s.list.Front()
	s.list.Remove(e)
	return e.Value.(*blocks.Block)
}

func (s *BlockList) Len() int {
	return s.list.Len()
}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180

// TODO extract
type waitable interface {
	Closing() <-chan struct{}
}

// TODO extract
func childContext(w waitable) context.Context {
	ctx, cancel := context.WithCancel(context.Background())
	go func() {
		<-w.Closing()
		cancel()
	}()
	return ctx
}