workers.go 5.6 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
dirkmc's avatar
dirkmc committed
5
	"fmt"
6

7
	engine "github.com/ipfs/go-bitswap/decision"
Jeromy's avatar
Jeromy committed
8
	bsmsg "github.com/ipfs/go-bitswap/message"
dirkmc's avatar
dirkmc committed
9
	pb "github.com/ipfs/go-bitswap/message/pb"
Jeromy's avatar
Jeromy committed
10 11 12 13
	cid "github.com/ipfs/go-cid"
	logging "github.com/ipfs/go-log"
	process "github.com/jbenet/goprocess"
	procctx "github.com/jbenet/goprocess/context"
14 15
)

16 17
// TaskWorkerCount is the total number of simultaneous threads sending
// outgoing messages
18
var TaskWorkerCount = 8
19

20
func (bs *Bitswap) startWorkers(ctx context.Context, px process.Process) {
21

Jeromy's avatar
Jeromy committed
22 23
	// Start up workers to handle requests from other nodes for the data on this node
	for i := 0; i < TaskWorkerCount; i++ {
24
		i := i
Jeromy's avatar
Jeromy committed
25
		px.Go(func(px process.Process) {
26
			bs.taskWorker(ctx, i)
Jeromy's avatar
Jeromy committed
27 28
		})
	}
29

30
	if bs.provideEnabled {
31 32 33 34 35 36 37 38 39 40
		// Start up a worker to manage sending out provides messages
		px.Go(func(px process.Process) {
			bs.provideCollector(ctx)
		})

		// Spawn up multiple workers to handle incoming blocks
		// consider increasing number if providing blocks bottlenecks
		// file transfers
		px.Go(bs.provideWorker)
	}
41 42
}

43
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
Jeromy's avatar
Jeromy committed
44
	idmap := logging.LoggableMap{"ID": id}
Jeromy's avatar
Jeromy committed
45
	defer log.Debug("bitswap task worker shutting down...")
46
	for {
47
		log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
48 49 50 51 52 53 54
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
dirkmc's avatar
dirkmc committed
55

56 57 58
				// update the BS ledger to reflect sent message
				// TODO: Should only track *useful* messages in ledger
				outgoing := bsmsg.New(false)
59 60 61 62 63 64 65 66 67 68
				for _, block := range envelope.Message.Blocks() {
					log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableF(func() map[string]interface{} {
						return logging.LoggableMap{
							"ID":     id,
							"Target": envelope.Peer.Pretty(),
							"Block":  block.Cid().String(),
						}
					}))
					outgoing.AddBlock(block)
				}
dirkmc's avatar
dirkmc committed
69 70 71 72
				for _, blockPresence := range envelope.Message.BlockPresences() {
					outgoing.AddBlockPresence(blockPresence.Cid, blockPresence.Type)
				}
				// TODO: Only record message as sent if there was no error?
73 74
				bs.engine.MessageSent(envelope.Peer, outgoing)

75
				bs.sendBlocks(ctx, envelope)
Jeromy's avatar
Jeromy committed
76
				bs.counterLk.Lock()
77 78 79 80
				for _, block := range envelope.Message.Blocks() {
					bs.counters.blocksSent++
					bs.counters.dataSent += uint64(len(block.RawData()))
				}
Jeromy's avatar
Jeromy committed
81
				bs.counterLk.Unlock()
82 83 84 85 86 87 88 89 90
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

91 92 93 94 95 96 97
func (bs *Bitswap) sendBlocks(ctx context.Context, env *engine.Envelope) {
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

	msgSize := 0
	msg := bsmsg.New(false)
dirkmc's avatar
dirkmc committed
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112

	for _, blockPresence := range env.Message.BlockPresences() {
		c := blockPresence.Cid
		switch blockPresence.Type {
		case pb.Message_Have:
			log.Infof("Sending HAVE %s to %s", c.String()[2:8], env.Peer)
		case pb.Message_DontHave:
			log.Infof("Sending DONT_HAVE %s to %s", c.String()[2:8], env.Peer)
		default:
			panic(fmt.Sprintf("unrecognized BlockPresence type %v", blockPresence.Type))
		}

		msgSize += bsmsg.BlockPresenceSize(c)
		msg.AddBlockPresence(c, blockPresence.Type)
	}
113 114 115 116 117 118 119 120 121
	for _, block := range env.Message.Blocks() {
		msgSize += len(block.RawData())
		msg.AddBlock(block)
		log.Infof("Sending block %s to %s", block, env.Peer)
	}

	bs.sentHistogram.Observe(float64(msgSize))
	err := bs.network.SendMessage(ctx, env.Peer, msg)
	if err != nil {
dirkmc's avatar
dirkmc committed
122 123
		// log.Infof("sendblock error: %s", err)
		log.Errorf("SendMessage error: %s. size: %d. block-presence length: %d", err, msg.Size(), len(env.Message.BlockPresences()))
124
	}
dirkmc's avatar
dirkmc committed
125
	log.Infof("Sent message to %s", env.Peer)
126 127
}

128
func (bs *Bitswap) provideWorker(px process.Process) {
129 130 131 132 133 134 135 136 137
	// FIXME: OnClosingContext returns a _custom_ context type.
	// Unfortunately, deriving a new cancelable context from this custom
	// type fires off a goroutine. To work around this, we create a single
	// cancelable context up-front and derive all sub-contexts from that.
	//
	// See: https://github.com/ipfs/go-ipfs/issues/5810
	ctx := procctx.OnClosingContext(px)
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
138

139
	limit := make(chan struct{}, provideWorkerMax)
140

141
	limitedGoProvide := func(k cid.Cid, wid int) {
142 143 144 145
		defer func() {
			// replace token when done
			<-limit
		}()
Jeromy's avatar
Jeromy committed
146
		ev := logging.LoggableMap{"ID": wid}
147

148
		defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, k).Done()
149

150 151
		ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
		defer cancel()
152

153
		if err := bs.network.Provide(ctx, k); err != nil {
Jeromy's avatar
Jeromy committed
154
			log.Warning(err)
155
		}
156
	}
157 158 159

	// worker spawner, reads from bs.provideKeys until it closes, spawning a
	// _ratelimited_ number of workers to handle each key.
160 161
	for wid := 2; ; wid++ {
		ev := logging.LoggableMap{"ID": 1}
162
		log.Event(ctx, "Bitswap.ProvideWorker.Loop", ev)
163

164 165 166 167 168 169 170 171
		select {
		case <-px.Closing():
			return
		case k, ok := <-bs.provideKeys:
			if !ok {
				log.Debug("provideKeys channel closed")
				return
			}
172 173 174
			select {
			case <-px.Closing():
				return
175 176
			case limit <- struct{}{}:
				go limitedGoProvide(k, wid)
177 178
			}
		}
179
	}
180 181
}

182 183
func (bs *Bitswap) provideCollector(ctx context.Context) {
	defer close(bs.provideKeys)
184 185 186
	var toProvide []cid.Cid
	var nextKey cid.Cid
	var keysOut chan cid.Cid
187 188 189

	for {
		select {
190
		case blkey, ok := <-bs.newBlocks:
191 192 193 194
			if !ok {
				log.Debug("newBlocks channel closed")
				return
			}
195

Jeromy's avatar
Jeromy committed
196
			if keysOut == nil {
197
				nextKey = blkey
Jeromy's avatar
Jeromy committed
198 199
				keysOut = bs.provideKeys
			} else {
200
				toProvide = append(toProvide, blkey)
Jeromy's avatar
Jeromy committed
201 202
			}
		case keysOut <- nextKey:
Jeromy's avatar
Jeromy committed
203 204 205
			if len(toProvide) > 0 {
				nextKey = toProvide[0]
				toProvide = toProvide[1:]
206
			} else {
Jeromy's avatar
Jeromy committed
207
				keysOut = nil
208 209 210 211 212 213
			}
		case <-ctx.Done():
			return
		}
	}
}