worker.go 4.33 KB
Newer Older
1 2 3 4 5 6 7 8
// TODO FIXME name me
package worker

import (
	"container/list"
	"errors"
	"time"

9
	process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
10
	procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
11 12
	ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
	blocks "github.com/ipfs/go-ipfs/blocks"
13
	key "github.com/ipfs/go-ipfs/blocks/key"
14 15
	exchange "github.com/ipfs/go-ipfs/exchange"
	util "github.com/ipfs/go-ipfs/util"
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
)

var log = util.Logger("blockservice")

var DefaultConfig = Config{
	NumWorkers:       1,
	ClientBufferSize: 0,
	WorkerBufferSize: 0,
}

type Config struct {
	// NumWorkers sets the number of background workers that provide blocks to
	// the exchange.
	NumWorkers int

	// ClientBufferSize allows clients of HasBlock to send up to
	// |ClientBufferSize| blocks without blocking.
	ClientBufferSize int

	// WorkerBufferSize can be used in conjunction with NumWorkers to reduce
	// communication-coordination within the worker.
	WorkerBufferSize int
}

// TODO FIXME name me
type Worker struct {
	// added accepts blocks from client
	added    chan *blocks.Block
	exchange exchange.Interface

	// workQueue is owned by the client worker
	// process manages life-cycle
	process process.Process
}

func NewWorker(e exchange.Interface, c Config) *Worker {
	if c.NumWorkers < 1 {
		c.NumWorkers = 1 // provide a sane default
	}
	w := &Worker{
		exchange: e,
		added:    make(chan *blocks.Block, c.ClientBufferSize),
		process:  process.WithParent(process.Background()), // internal management
	}
	w.start(c)
	return w
}

func (w *Worker) HasBlock(b *blocks.Block) error {
	select {
	case <-w.process.Closed():
		return errors.New("blockservice worker is closed")
	case w.added <- b:
		return nil
	}
}

func (w *Worker) Close() error {
	log.Debug("blockservice provide worker is shutting down...")
	return w.process.Close()
}

func (w *Worker) start(c Config) {

	workerChan := make(chan *blocks.Block, c.WorkerBufferSize)

	// clientWorker handles incoming blocks from |w.added| and sends to
	// |workerChan|. This will never block the client.
	w.process.Go(func(proc process.Process) {
		defer close(workerChan)

		var workQueue BlockList
88 89
		debugInfo := time.NewTicker(5 * time.Second)
		defer debugInfo.Stop()
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
		for {

			// take advantage of the fact that sending on nil channel always
			// blocks so that a message is only sent if a block exists
			sendToWorker := workerChan
			nextBlock := workQueue.Pop()
			if nextBlock == nil {
				sendToWorker = nil
			}

			select {

			// if worker is ready and there's a block to process, send the
			// block
			case sendToWorker <- nextBlock:
105
			case <-debugInfo.C:
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
				if workQueue.Len() > 0 {
					log.Debugf("%d blocks in blockservice provide queue...", workQueue.Len())
				}
			case block := <-w.added:
				if nextBlock != nil {
					workQueue.Push(nextBlock) // missed the chance to send it
				}
				// if the client sends another block, add it to the queue.
				workQueue.Push(block)
			case <-proc.Closing():
				return
			}
		}
	})

121 122 123
	// reads from |workerChan| until w.process closes
	limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers)
	limiter.Go(func(proc process.Process) {
124
		ctx := procctx.OnClosingContext(proc) // shut down in-progress HasBlock when time to die
Brian Tiger Chow's avatar
Brian Tiger Chow committed
125 126 127 128 129 130
		for {
			select {
			case <-proc.Closing():
				return
			case block, ok := <-workerChan:
				if !ok {
131
					return
Brian Tiger Chow's avatar
Brian Tiger Chow committed
132
				}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
133 134
				limiter.LimitedGo(func(proc process.Process) {
					if err := w.exchange.HasBlock(ctx, block); err != nil {
Brian Tiger Chow's avatar
log err  
Brian Tiger Chow committed
135
						log.Infof("blockservice worker error: %s", err)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
136 137
					}
				})
138
			}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
139 140
		}
	})
141 142 143
}

type BlockList struct {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
144
	list    list.List
145
	uniques map[key.Key]*list.Element
146 147 148
}

func (s *BlockList) PushFront(b *blocks.Block) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
149
	if s.uniques == nil {
150
		s.uniques = make(map[key.Key]*list.Element)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
151 152 153 154 155 156
	}
	_, ok := s.uniques[b.Key()]
	if !ok {
		e := s.list.PushFront(b)
		s.uniques[b.Key()] = e
	}
157 158 159
}

func (s *BlockList) Push(b *blocks.Block) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
160
	if s.uniques == nil {
161
		s.uniques = make(map[key.Key]*list.Element)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
162 163 164 165 166 167
	}
	_, ok := s.uniques[b.Key()]
	if !ok {
		e := s.list.PushBack(b)
		s.uniques[b.Key()] = e
	}
168 169 170 171 172 173 174 175
}

func (s *BlockList) Pop() *blocks.Block {
	if s.list.Len() == 0 {
		return nil
	}
	e := s.list.Front()
	s.list.Remove(e)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
176 177 178
	b := e.Value.(*blocks.Block)
	delete(s.uniques, b.Key())
	return b
179 180 181 182 183
}

func (s *BlockList) Len() int {
	return s.list.Len()
}