workers.go 4.24 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5

6
	engine "github.com/ipfs/go-bitswap/decision"
Jeromy's avatar
Jeromy committed
7 8 9 10 11
	bsmsg "github.com/ipfs/go-bitswap/message"
	cid "github.com/ipfs/go-cid"
	logging "github.com/ipfs/go-log"
	process "github.com/jbenet/goprocess"
	procctx "github.com/jbenet/goprocess/context"
12 13
)

14
var TaskWorkerCount = 8
15

16
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
17

Jeromy's avatar
Jeromy committed
18 19
	// Start up workers to handle requests from other nodes for the data on this node
	for i := 0; i < TaskWorkerCount; i++ {
20
		i := i
Jeromy's avatar
Jeromy committed
21
		px.Go(func(px process.Process) {
22
			bs.taskWorker(ctx, i)
Jeromy's avatar
Jeromy committed
23 24
		})
	}
25

Jeromy's avatar
Jeromy committed
26
	// Start up a worker to manage sending out provides messages
27 28 29 30
	px.Go(func(px process.Process) {
		bs.provideCollector(ctx)
	})

31 32 33
	// Spawn up multiple workers to handle incoming blocks
	// consider increasing number if providing blocks bottlenecks
	// file transfers
34
	px.Go(bs.provideWorker)
35 36
}

37
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
Jeromy's avatar
Jeromy committed
38
	idmap := logging.LoggableMap{"ID": id}
Jeromy's avatar
Jeromy committed
39
	defer log.Debug("bitswap task worker shutting down...")
40
	for {
41
		log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
42 43 44 45 46 47 48
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
49 50 51
				// update the BS ledger to reflect sent message
				// TODO: Should only track *useful* messages in ledger
				outgoing := bsmsg.New(false)
52 53 54 55 56 57 58 59 60 61
				for _, block := range envelope.Message.Blocks() {
					log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableF(func() map[string]interface{} {
						return logging.LoggableMap{
							"ID":     id,
							"Target": envelope.Peer.Pretty(),
							"Block":  block.Cid().String(),
						}
					}))
					outgoing.AddBlock(block)
				}
62 63
				bs.engine.MessageSent(envelope.Peer, outgoing)

64
				bs.sendBlocks(ctx, envelope)
Jeromy's avatar
Jeromy committed
65
				bs.counterLk.Lock()
66 67 68 69
				for _, block := range envelope.Message.Blocks() {
					bs.counters.blocksSent++
					bs.counters.dataSent += uint64(len(block.RawData()))
				}
Jeromy's avatar
Jeromy committed
70
				bs.counterLk.Unlock()
71 72 73 74 75 76 77 78 79
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
func (bs *Bitswap) sendBlocks(ctx context.Context, env *engine.Envelope) {
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

	msgSize := 0
	msg := bsmsg.New(false)
	for _, block := range env.Message.Blocks() {
		msgSize += len(block.RawData())
		msg.AddBlock(block)
		log.Infof("Sending block %s to %s", block, env.Peer)
	}

	bs.sentHistogram.Observe(float64(msgSize))
	err := bs.network.SendMessage(ctx, env.Peer, msg)
	if err != nil {
		log.Infof("sendblock error: %s", err)
	}
}

100 101
func (bs *Bitswap) provideWorker(px process.Process) {

102
	limit := make(chan struct{}, provideWorkerMax)
103

104
	limitedGoProvide := func(k cid.Cid, wid int) {
105 106 107 108
		defer func() {
			// replace token when done
			<-limit
		}()
Jeromy's avatar
Jeromy committed
109
		ev := logging.LoggableMap{"ID": wid}
110

111
		ctx := procctx.OnClosingContext(px) // derive ctx from px
112
		defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, k).Done()
113

114 115
		ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
		defer cancel()
116

117
		if err := bs.network.Provide(ctx, k); err != nil {
Jeromy's avatar
Jeromy committed
118
			log.Warning(err)
119
		}
120
	}
121 122 123

	// worker spawner, reads from bs.provideKeys until it closes, spawning a
	// _ratelimited_ number of workers to handle each key.
124 125 126
	for wid := 2; ; wid++ {
		ev := logging.LoggableMap{"ID": 1}
		log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
127

128 129 130 131 132 133 134 135
		select {
		case <-px.Closing():
			return
		case k, ok := <-bs.provideKeys:
			if !ok {
				log.Debug("provideKeys channel closed")
				return
			}
136 137 138
			select {
			case <-px.Closing():
				return
139 140
			case limit <- struct{}{}:
				go limitedGoProvide(k, wid)
141 142
			}
		}
143
	}
144 145
}

146 147
func (bs *Bitswap) provideCollector(ctx context.Context) {
	defer close(bs.provideKeys)
148 149 150
	var toProvide []cid.Cid
	var nextKey cid.Cid
	var keysOut chan cid.Cid
151 152 153

	for {
		select {
154
		case blkey, ok := <-bs.newBlocks:
155 156 157 158
			if !ok {
				log.Debug("newBlocks channel closed")
				return
			}
159

Jeromy's avatar
Jeromy committed
160
			if keysOut == nil {
161
				nextKey = blkey
Jeromy's avatar
Jeromy committed
162 163
				keysOut = bs.provideKeys
			} else {
164
				toProvide = append(toProvide, blkey)
Jeromy's avatar
Jeromy committed
165 166
			}
		case keysOut <- nextKey:
Jeromy's avatar
Jeromy committed
167 168 169
			if len(toProvide) > 0 {
				nextKey = toProvide[0]
				toProvide = toProvide[1:]
170
			} else {
Jeromy's avatar
Jeromy committed
171
				keysOut = nil
172 173 174 175 176 177
			}
		case <-ctx.Done():
			return
		}
	}
}