workers.go 5.24 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
dirkmc's avatar
dirkmc committed
5
	"fmt"
6

Jeromy's avatar
Jeromy committed
7 8
	process "github.com/jbenet/goprocess"
	procctx "github.com/jbenet/goprocess/context"
9 10 11
	engine "gitlab.dms3.io/dms3/go-bitswap/internal/decision"
	pb "gitlab.dms3.io/dms3/go-bitswap/message/pb"
	cid "gitlab.dms3.io/dms3/go-cid"
12
	"go.uber.org/zap"
13 14
)

15 16
// TaskWorkerCount is the total number of simultaneous threads sending
// outgoing messages
17
var TaskWorkerCount = 8
18

19
func (bs *Bitswap) startWorkers(ctx context.Context, px process.Process) {
20

Jeromy's avatar
Jeromy committed
21 22
	// Start up workers to handle requests from other nodes for the data on this node
	for i := 0; i < TaskWorkerCount; i++ {
23
		i := i
Jeromy's avatar
Jeromy committed
24
		px.Go(func(px process.Process) {
25
			bs.taskWorker(ctx, i)
Jeromy's avatar
Jeromy committed
26 27
		})
	}
28

29
	if bs.provideEnabled {
30 31 32 33 34 35 36 37 38 39
		// Start up a worker to manage sending out provides messages
		px.Go(func(px process.Process) {
			bs.provideCollector(ctx)
		})

		// Spawn up multiple workers to handle incoming blocks
		// consider increasing number if providing blocks bottlenecks
		// file transfers
		px.Go(bs.provideWorker)
	}
40 41
}

42
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
Jeromy's avatar
Jeromy committed
43
	defer log.Debug("bitswap task worker shutting down...")
44
	log := log.With("ID", id)
45
	for {
46
		log.Debug("Bitswap.TaskWorker.Loop")
47 48 49 50 51 52 53
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
dirkmc's avatar
dirkmc committed
54 55

				// TODO: Only record message as sent if there was no error?
56 57 58
				// Ideally, yes. But we'd need some way to trigger a retry and/or drop
				// the peer.
				bs.engine.MessageSent(envelope.Peer, envelope.Message)
Tomasz Zdybał's avatar
Tomasz Zdybał committed
59 60 61
				if bs.wiretap != nil {
					bs.wiretap.MessageSent(envelope.Peer, envelope.Message)
				}
62
				bs.sendBlocks(ctx, envelope)
63 64 65 66 67 68 69 70 71
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

72
func (bs *Bitswap) logOutgoingBlocks(env *engine.Envelope) {
Steven Allen's avatar
Steven Allen committed
73
	if ce := sflog.Check(zap.DebugLevel, "sent message"); ce == nil {
74 75
		return
	}
dirkmc's avatar
dirkmc committed
76

Steven Allen's avatar
Steven Allen committed
77 78
	self := bs.network.Self()

dirkmc's avatar
dirkmc committed
79 80 81 82
	for _, blockPresence := range env.Message.BlockPresences() {
		c := blockPresence.Cid
		switch blockPresence.Type {
		case pb.Message_Have:
Steven Allen's avatar
Steven Allen committed
83
			log.Debugw("sent message",
84 85
				"type", "HAVE",
				"cid", c,
Steven Allen's avatar
Steven Allen committed
86 87
				"local", self,
				"to", env.Peer,
88
			)
dirkmc's avatar
dirkmc committed
89
		case pb.Message_DontHave:
Steven Allen's avatar
Steven Allen committed
90
			log.Debugw("sent message",
91 92
				"type", "DONT_HAVE",
				"cid", c,
Steven Allen's avatar
Steven Allen committed
93 94
				"local", self,
				"to", env.Peer,
95
			)
dirkmc's avatar
dirkmc committed
96 97 98 99 100
		default:
			panic(fmt.Sprintf("unrecognized BlockPresence type %v", blockPresence.Type))
		}

	}
101
	for _, block := range env.Message.Blocks() {
Steven Allen's avatar
Steven Allen committed
102
		log.Debugw("sent message",
103 104
			"type", "BLOCK",
			"cid", block.Cid(),
Steven Allen's avatar
Steven Allen committed
105 106
			"local", self,
			"to", env.Peer,
107
		)
108
	}
109
}
110

111 112 113 114 115 116
func (bs *Bitswap) sendBlocks(ctx context.Context, env *engine.Envelope) {
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

	err := bs.network.SendMessage(ctx, env.Peer, env.Message)
117
	if err != nil {
118 119 120 121 122 123 124
		log.Debugw("failed to send blocks message",
			"peer", env.Peer,
			"error", err,
		)
		return
	}

Steven Allen's avatar
Steven Allen committed
125 126
	bs.logOutgoingBlocks(env)

127 128 129 130
	dataSent := 0
	blocks := env.Message.Blocks()
	for _, b := range blocks {
		dataSent += len(b.RawData())
131
	}
132 133 134 135 136 137
	bs.counterLk.Lock()
	bs.counters.blocksSent += uint64(len(blocks))
	bs.counters.dataSent += uint64(dataSent)
	bs.counterLk.Unlock()
	bs.sentHistogram.Observe(float64(env.Message.Size()))
	log.Debugw("sent message", "peer", env.Peer)
138 139
}

140
func (bs *Bitswap) provideWorker(px process.Process) {
141 142 143 144 145
	// FIXME: OnClosingContext returns a _custom_ context type.
	// Unfortunately, deriving a new cancelable context from this custom
	// type fires off a goroutine. To work around this, we create a single
	// cancelable context up-front and derive all sub-contexts from that.
	//
146
	// See: https://gitlab.dms3.io/dms3/go-dms3/issues/5810
147 148 149
	ctx := procctx.OnClosingContext(px)
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
150

151
	limit := make(chan struct{}, provideWorkerMax)
152

153
	limitedGoProvide := func(k cid.Cid, wid int) {
154 155 156 157
		defer func() {
			// replace token when done
			<-limit
		}()
158

159 160
		log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k)
		defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k)
161

162 163
		ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
		defer cancel()
164

165
		if err := bs.network.Provide(ctx, k); err != nil {
166
			log.Warn(err)
167
		}
168
	}
169 170 171

	// worker spawner, reads from bs.provideKeys until it closes, spawning a
	// _ratelimited_ number of workers to handle each key.
172
	for wid := 2; ; wid++ {
173
		log.Debug("Bitswap.ProvideWorker.Loop")
174

175 176 177 178 179 180 181 182
		select {
		case <-px.Closing():
			return
		case k, ok := <-bs.provideKeys:
			if !ok {
				log.Debug("provideKeys channel closed")
				return
			}
183 184 185
			select {
			case <-px.Closing():
				return
186 187
			case limit <- struct{}{}:
				go limitedGoProvide(k, wid)
188 189
			}
		}
190
	}
191 192
}

193 194
func (bs *Bitswap) provideCollector(ctx context.Context) {
	defer close(bs.provideKeys)
195 196 197
	var toProvide []cid.Cid
	var nextKey cid.Cid
	var keysOut chan cid.Cid
198 199 200

	for {
		select {
201
		case blkey, ok := <-bs.newBlocks:
202 203 204 205
			if !ok {
				log.Debug("newBlocks channel closed")
				return
			}
206

Jeromy's avatar
Jeromy committed
207
			if keysOut == nil {
208
				nextKey = blkey
Jeromy's avatar
Jeromy committed
209 210
				keysOut = bs.provideKeys
			} else {
211
				toProvide = append(toProvide, blkey)
Jeromy's avatar
Jeromy committed
212 213
			}
		case keysOut <- nextKey:
Jeromy's avatar
Jeromy committed
214 215 216
			if len(toProvide) > 0 {
				nextKey = toProvide[0]
				toProvide = toProvide[1:]
217
			} else {
Jeromy's avatar
Jeromy committed
218
				keysOut = nil
219 220 221 222 223 224
			}
		case <-ctx.Done():
			return
		}
	}
}