worker.go 4.35 KB
Newer Older
1 2 3 4 5 6 7 8
// TODO FIXME name me
package worker

import (
	"container/list"
	"errors"
	"time"

9
	process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
10
	procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
11 12
	ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
	blocks "github.com/ipfs/go-ipfs/blocks"
13
	key "github.com/ipfs/go-ipfs/blocks/key"
14
	exchange "github.com/ipfs/go-ipfs/exchange"
Jeromy's avatar
Jeromy committed
15 16

	logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0"
17 18
)

Jeromy's avatar
Jeromy committed
19
var log = logging.Logger("blockservice")
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88

var DefaultConfig = Config{
	NumWorkers:       1,
	ClientBufferSize: 0,
	WorkerBufferSize: 0,
}

type Config struct {
	// NumWorkers sets the number of background workers that provide blocks to
	// the exchange.
	NumWorkers int

	// ClientBufferSize allows clients of HasBlock to send up to
	// |ClientBufferSize| blocks without blocking.
	ClientBufferSize int

	// WorkerBufferSize can be used in conjunction with NumWorkers to reduce
	// communication-coordination within the worker.
	WorkerBufferSize int
}

// TODO FIXME name me
type Worker struct {
	// added accepts blocks from client
	added    chan *blocks.Block
	exchange exchange.Interface

	// workQueue is owned by the client worker
	// process manages life-cycle
	process process.Process
}

func NewWorker(e exchange.Interface, c Config) *Worker {
	if c.NumWorkers < 1 {
		c.NumWorkers = 1 // provide a sane default
	}
	w := &Worker{
		exchange: e,
		added:    make(chan *blocks.Block, c.ClientBufferSize),
		process:  process.WithParent(process.Background()), // internal management
	}
	w.start(c)
	return w
}

func (w *Worker) HasBlock(b *blocks.Block) error {
	select {
	case <-w.process.Closed():
		return errors.New("blockservice worker is closed")
	case w.added <- b:
		return nil
	}
}

func (w *Worker) Close() error {
	log.Debug("blockservice provide worker is shutting down...")
	return w.process.Close()
}

func (w *Worker) start(c Config) {

	workerChan := make(chan *blocks.Block, c.WorkerBufferSize)

	// clientWorker handles incoming blocks from |w.added| and sends to
	// |workerChan|. This will never block the client.
	w.process.Go(func(proc process.Process) {
		defer close(workerChan)

		var workQueue BlockList
89 90
		debugInfo := time.NewTicker(5 * time.Second)
		defer debugInfo.Stop()
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
		for {

			// take advantage of the fact that sending on nil channel always
			// blocks so that a message is only sent if a block exists
			sendToWorker := workerChan
			nextBlock := workQueue.Pop()
			if nextBlock == nil {
				sendToWorker = nil
			}

			select {

			// if worker is ready and there's a block to process, send the
			// block
			case sendToWorker <- nextBlock:
106
			case <-debugInfo.C:
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
				if workQueue.Len() > 0 {
					log.Debugf("%d blocks in blockservice provide queue...", workQueue.Len())
				}
			case block := <-w.added:
				if nextBlock != nil {
					workQueue.Push(nextBlock) // missed the chance to send it
				}
				// if the client sends another block, add it to the queue.
				workQueue.Push(block)
			case <-proc.Closing():
				return
			}
		}
	})

122 123 124
	// reads from |workerChan| until w.process closes
	limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers)
	limiter.Go(func(proc process.Process) {
125
		ctx := procctx.OnClosingContext(proc) // shut down in-progress HasBlock when time to die
Brian Tiger Chow's avatar
Brian Tiger Chow committed
126 127 128 129 130 131
		for {
			select {
			case <-proc.Closing():
				return
			case block, ok := <-workerChan:
				if !ok {
132
					return
Brian Tiger Chow's avatar
Brian Tiger Chow committed
133
				}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
134 135
				limiter.LimitedGo(func(proc process.Process) {
					if err := w.exchange.HasBlock(ctx, block); err != nil {
Brian Tiger Chow's avatar
log err  
Brian Tiger Chow committed
136
						log.Infof("blockservice worker error: %s", err)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
137 138
					}
				})
139
			}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
140 141
		}
	})
142 143 144
}

type BlockList struct {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
145
	list    list.List
146
	uniques map[key.Key]*list.Element
147 148 149
}

func (s *BlockList) PushFront(b *blocks.Block) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
150
	if s.uniques == nil {
151
		s.uniques = make(map[key.Key]*list.Element)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
152 153 154 155 156 157
	}
	_, ok := s.uniques[b.Key()]
	if !ok {
		e := s.list.PushFront(b)
		s.uniques[b.Key()] = e
	}
158 159 160
}

func (s *BlockList) Push(b *blocks.Block) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
161
	if s.uniques == nil {
162
		s.uniques = make(map[key.Key]*list.Element)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
163 164 165 166 167 168
	}
	_, ok := s.uniques[b.Key()]
	if !ok {
		e := s.list.PushBack(b)
		s.uniques[b.Key()] = e
	}
169 170 171 172 173 174 175 176
}

func (s *BlockList) Pop() *blocks.Block {
	if s.list.Len() == 0 {
		return nil
	}
	e := s.list.Front()
	s.list.Remove(e)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
177 178 179
	b := e.Value.(*blocks.Block)
	delete(s.uniques, b.Key())
	return b
180 181 182 183 184
}

func (s *BlockList) Len() int {
	return s.list.Len()
}