workers.go 5.85 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5
	"math/rand"
6
	"sync"
7 8
	"time"

Jeromy's avatar
Jeromy committed
9
	bsmsg "github.com/ipfs/go-bitswap/message"
10

Jeromy's avatar
Jeromy committed
11 12 13 14 15
	cid "github.com/ipfs/go-cid"
	logging "github.com/ipfs/go-log"
	process "github.com/jbenet/goprocess"
	procctx "github.com/jbenet/goprocess/context"
	peer "github.com/libp2p/go-libp2p-peer"
16 17
)

18
var TaskWorkerCount = 8
19

20
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
21 22
	// Start up a worker to handle block requests this node is making
	px.Go(func(px process.Process) {
23
		bs.providerQueryManager(ctx)
24 25
	})

Jeromy's avatar
Jeromy committed
26 27
	// Start up workers to handle requests from other nodes for the data on this node
	for i := 0; i < TaskWorkerCount; i++ {
28
		i := i
Jeromy's avatar
Jeromy committed
29
		px.Go(func(px process.Process) {
30
			bs.taskWorker(ctx, i)
Jeromy's avatar
Jeromy committed
31 32
		})
	}
33 34

	// Start up a worker to manage periodically resending our wantlist out to peers
Jeromy's avatar
Jeromy committed
35 36 37
	px.Go(func(px process.Process) {
		bs.rebroadcastWorker(ctx)
	})
38

Jeromy's avatar
Jeromy committed
39
	// Start up a worker to manage sending out provides messages
40 41 42 43
	px.Go(func(px process.Process) {
		bs.provideCollector(ctx)
	})

44 45 46
	// Spawn up multiple workers to handle incoming blocks
	// consider increasing number if providing blocks bottlenecks
	// file transfers
47
	px.Go(bs.provideWorker)
48 49
}

50
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
Jeromy's avatar
Jeromy committed
51
	idmap := logging.LoggableMap{"ID": id}
Jeromy's avatar
Jeromy committed
52
	defer log.Debug("bitswap task worker shutting down...")
53
	for {
54
		log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
55 56 57 58 59 60 61
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
62 63 64
				// update the BS ledger to reflect sent message
				// TODO: Should only track *useful* messages in ledger
				outgoing := bsmsg.New(false)
65 66 67 68 69 70 71 72 73 74
				for _, block := range envelope.Message.Blocks() {
					log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableF(func() map[string]interface{} {
						return logging.LoggableMap{
							"ID":     id,
							"Target": envelope.Peer.Pretty(),
							"Block":  block.Cid().String(),
						}
					}))
					outgoing.AddBlock(block)
				}
75 76
				bs.engine.MessageSent(envelope.Peer, outgoing)

77
				bs.wm.SendBlocks(ctx, envelope)
Jeromy's avatar
Jeromy committed
78
				bs.counterLk.Lock()
79 80 81 82
				for _, block := range envelope.Message.Blocks() {
					bs.counters.blocksSent++
					bs.counters.dataSent += uint64(len(block.RawData()))
				}
Jeromy's avatar
Jeromy committed
83
				bs.counterLk.Unlock()
84 85 86 87 88 89 90 91 92
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

93 94
func (bs *Bitswap) provideWorker(px process.Process) {

95
	limit := make(chan struct{}, provideWorkerMax)
96

97
	limitedGoProvide := func(k cid.Cid, wid int) {
98 99 100 101
		defer func() {
			// replace token when done
			<-limit
		}()
Jeromy's avatar
Jeromy committed
102
		ev := logging.LoggableMap{"ID": wid}
103

104
		ctx := procctx.OnClosingContext(px) // derive ctx from px
105
		defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, k).Done()
106

107 108
		ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
		defer cancel()
109

110
		if err := bs.network.Provide(ctx, k); err != nil {
Jeromy's avatar
Jeromy committed
111
			log.Warning(err)
112
		}
113
	}
114 115 116

	// worker spawner, reads from bs.provideKeys until it closes, spawning a
	// _ratelimited_ number of workers to handle each key.
117 118 119
	for wid := 2; ; wid++ {
		ev := logging.LoggableMap{"ID": 1}
		log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
120

121 122 123 124 125 126 127 128
		select {
		case <-px.Closing():
			return
		case k, ok := <-bs.provideKeys:
			if !ok {
				log.Debug("provideKeys channel closed")
				return
			}
129 130 131
			select {
			case <-px.Closing():
				return
132 133
			case limit <- struct{}{}:
				go limitedGoProvide(k, wid)
134 135
			}
		}
136
	}
137 138
}

139 140
func (bs *Bitswap) provideCollector(ctx context.Context) {
	defer close(bs.provideKeys)
141 142 143
	var toProvide []cid.Cid
	var nextKey cid.Cid
	var keysOut chan cid.Cid
144 145 146

	for {
		select {
147
		case blkey, ok := <-bs.newBlocks:
148 149 150 151
			if !ok {
				log.Debug("newBlocks channel closed")
				return
			}
152

Jeromy's avatar
Jeromy committed
153
			if keysOut == nil {
154
				nextKey = blkey
Jeromy's avatar
Jeromy committed
155 156
				keysOut = bs.provideKeys
			} else {
157
				toProvide = append(toProvide, blkey)
Jeromy's avatar
Jeromy committed
158 159
			}
		case keysOut <- nextKey:
Jeromy's avatar
Jeromy committed
160 161 162
			if len(toProvide) > 0 {
				nextKey = toProvide[0]
				toProvide = toProvide[1:]
163
			} else {
Jeromy's avatar
Jeromy committed
164
				keysOut = nil
165 166 167 168 169 170 171
			}
		case <-ctx.Done():
			return
		}
	}
}

172
func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
173 174 175
	ctx, cancel := context.WithCancel(parent)
	defer cancel()

176 177 178 179 180
	broadcastSignal := time.NewTicker(rebroadcastDelay.Get())
	defer broadcastSignal.Stop()

	tick := time.NewTicker(10 * time.Second)
	defer tick.Stop()
181 182

	for {
183
		log.Event(ctx, "Bitswap.Rebroadcast.idle")
184
		select {
185 186
		case <-tick.C:
			n := bs.wm.wl.Len()
187
			if n > 0 {
188
				log.Debug(n, " keys in bitswap wantlist")
189
			}
190
		case <-broadcastSignal.C: // resend unfulfilled wantlist keys
191
			log.Event(ctx, "Bitswap.Rebroadcast.active")
192 193 194 195
			entries := bs.wm.wl.Entries()
			if len(entries) == 0 {
				continue
			}
196 197 198 199 200

			// TODO: come up with a better strategy for determining when to search
			// for new providers for blocks.
			i := rand.Intn(len(entries))
			bs.findKeys <- &blockRequest{
201
				Cid: entries[i].Cid,
202
				Ctx: ctx,
203 204 205 206 207 208
			}
		case <-parent.Done():
			return
		}
	}
}
209 210 211

func (bs *Bitswap) providerQueryManager(ctx context.Context) {
	var activeLk sync.Mutex
212
	kset := cid.NewSet()
213 214 215 216

	for {
		select {
		case e := <-bs.findKeys:
217 218 219 220 221 222
			select { // make sure its not already cancelled
			case <-e.Ctx.Done():
				continue
			default:
			}

223
			activeLk.Lock()
224
			if kset.Has(e.Cid) {
225 226 227
				activeLk.Unlock()
				continue
			}
228
			kset.Add(e.Cid)
229 230
			activeLk.Unlock()

231
			go func(e *blockRequest) {
232 233
				child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout)
				defer cancel()
234
				providers := bs.network.FindProvidersAsync(child, e.Cid, maxProvidersPerRequest)
235
				wg := &sync.WaitGroup{}
236
				for p := range providers {
237
					wg.Add(1)
238
					go func(p peer.ID) {
239
						defer wg.Done()
240 241 242 243 244 245
						err := bs.network.ConnectTo(child, p)
						if err != nil {
							log.Debug("failed to connect to provider %s: %s", p, err)
						}
					}(p)
				}
246
				wg.Wait()
247
				activeLk.Lock()
248
				kset.Remove(e.Cid)
249 250 251 252 253 254 255 256
				activeLk.Unlock()
			}(e)

		case <-ctx.Done():
			return
		}
	}
}