workers.go 5.45 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5
	"math/rand"
6
	"sync"
7 8
	"time"

9 10
	process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
	procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
Jeromy's avatar
Jeromy committed
11
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
Jeromy's avatar
Jeromy committed
12
	cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
13
	peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
14 15
)

16
var TaskWorkerCount = 8
17

18
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
19 20
	// Start up a worker to handle block requests this node is making
	px.Go(func(px process.Process) {
21
		bs.providerQueryManager(ctx)
22 23
	})

Jeromy's avatar
Jeromy committed
24 25
	// Start up workers to handle requests from other nodes for the data on this node
	for i := 0; i < TaskWorkerCount; i++ {
26
		i := i
Jeromy's avatar
Jeromy committed
27
		px.Go(func(px process.Process) {
28
			bs.taskWorker(ctx, i)
Jeromy's avatar
Jeromy committed
29 30
		})
	}
31 32

	// Start up a worker to manage periodically resending our wantlist out to peers
Jeromy's avatar
Jeromy committed
33 34 35
	px.Go(func(px process.Process) {
		bs.rebroadcastWorker(ctx)
	})
36

Jeromy's avatar
Jeromy committed
37
	// Start up a worker to manage sending out provides messages
38 39 40 41
	px.Go(func(px process.Process) {
		bs.provideCollector(ctx)
	})

42 43 44
	// Spawn up multiple workers to handle incoming blocks
	// consider increasing number if providing blocks bottlenecks
	// file transfers
45
	px.Go(bs.provideWorker)
46 47
}

48
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
Jeromy's avatar
Jeromy committed
49
	idmap := logging.LoggableMap{"ID": id}
50 51
	defer log.Info("bitswap task worker shutting down...")
	for {
52
		log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
53 54 55 56 57 58 59
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
Jeromy's avatar
Jeromy committed
60
				log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableMap{
61 62
					"ID":     id,
					"Target": envelope.Peer.Pretty(),
63
					"Block":  envelope.Block.Cid().String(),
64
				})
65

66
				bs.wm.SendBlock(ctx, envelope)
67 68 69 70 71 72 73 74 75
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

76 77
func (bs *Bitswap) provideWorker(px process.Process) {

78
	limit := make(chan struct{}, provideWorkerMax)
79

80
	limitedGoProvide := func(k *cid.Cid, wid int) {
81 82 83 84
		defer func() {
			// replace token when done
			<-limit
		}()
Jeromy's avatar
Jeromy committed
85
		ev := logging.LoggableMap{"ID": wid}
86

87
		ctx := procctx.OnClosingContext(px) // derive ctx from px
88
		defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, k).Done()
89

90 91
		ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
		defer cancel()
92

93
		if err := bs.network.Provide(ctx, k); err != nil {
Jeromy's avatar
Jeromy committed
94
			log.Warning(err)
95
		}
96
	}
97 98 99

	// worker spawner, reads from bs.provideKeys until it closes, spawning a
	// _ratelimited_ number of workers to handle each key.
100 101 102
	for wid := 2; ; wid++ {
		ev := logging.LoggableMap{"ID": 1}
		log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
103

104 105 106 107 108 109 110 111
		select {
		case <-px.Closing():
			return
		case k, ok := <-bs.provideKeys:
			if !ok {
				log.Debug("provideKeys channel closed")
				return
			}
112 113 114
			select {
			case <-px.Closing():
				return
115 116
			case limit <- struct{}{}:
				go limitedGoProvide(k, wid)
117 118
			}
		}
119
	}
120 121
}

122 123
func (bs *Bitswap) provideCollector(ctx context.Context) {
	defer close(bs.provideKeys)
124 125 126
	var toProvide []*cid.Cid
	var nextKey *cid.Cid
	var keysOut chan *cid.Cid
127 128 129

	for {
		select {
130
		case blkey, ok := <-bs.newBlocks:
131 132 133 134
			if !ok {
				log.Debug("newBlocks channel closed")
				return
			}
135

Jeromy's avatar
Jeromy committed
136
			if keysOut == nil {
137
				nextKey = blkey
Jeromy's avatar
Jeromy committed
138 139
				keysOut = bs.provideKeys
			} else {
140
				toProvide = append(toProvide, blkey)
Jeromy's avatar
Jeromy committed
141 142
			}
		case keysOut <- nextKey:
Jeromy's avatar
Jeromy committed
143 144 145
			if len(toProvide) > 0 {
				nextKey = toProvide[0]
				toProvide = toProvide[1:]
146
			} else {
Jeromy's avatar
Jeromy committed
147
				keysOut = nil
148 149 150 151 152 153 154
			}
		case <-ctx.Done():
			return
		}
	}
}

155
func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
156 157 158
	ctx, cancel := context.WithCancel(parent)
	defer cancel()

159 160 161 162 163
	broadcastSignal := time.NewTicker(rebroadcastDelay.Get())
	defer broadcastSignal.Stop()

	tick := time.NewTicker(10 * time.Second)
	defer tick.Stop()
164 165

	for {
166
		log.Event(ctx, "Bitswap.Rebroadcast.idle")
167
		select {
168 169
		case <-tick.C:
			n := bs.wm.wl.Len()
170
			if n > 0 {
171
				log.Debug(n, " keys in bitswap wantlist")
172
			}
173
		case <-broadcastSignal.C: // resend unfulfilled wantlist keys
174
			log.Event(ctx, "Bitswap.Rebroadcast.active")
175 176 177 178
			entries := bs.wm.wl.Entries()
			if len(entries) == 0 {
				continue
			}
179 180 181 182 183

			// TODO: come up with a better strategy for determining when to search
			// for new providers for blocks.
			i := rand.Intn(len(entries))
			bs.findKeys <- &blockRequest{
184
				Cid: entries[i].Cid,
185
				Ctx: ctx,
186 187 188 189 190 191
			}
		case <-parent.Done():
			return
		}
	}
}
192 193 194

func (bs *Bitswap) providerQueryManager(ctx context.Context) {
	var activeLk sync.Mutex
195
	kset := cid.NewSet()
196 197 198 199

	for {
		select {
		case e := <-bs.findKeys:
200 201 202 203 204 205
			select { // make sure its not already cancelled
			case <-e.Ctx.Done():
				continue
			default:
			}

206
			activeLk.Lock()
207
			if kset.Has(e.Cid) {
208 209 210
				activeLk.Unlock()
				continue
			}
211
			kset.Add(e.Cid)
212 213
			activeLk.Unlock()

214
			go func(e *blockRequest) {
215 216
				child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout)
				defer cancel()
217
				providers := bs.network.FindProvidersAsync(child, e.Cid, maxProvidersPerRequest)
218
				wg := &sync.WaitGroup{}
219
				for p := range providers {
220
					wg.Add(1)
221
					go func(p peer.ID) {
222
						defer wg.Done()
223 224 225 226 227 228
						err := bs.network.ConnectTo(child, p)
						if err != nil {
							log.Debug("failed to connect to provider %s: %s", p, err)
						}
					}(p)
				}
229
				wg.Wait()
230
				activeLk.Lock()
231
				kset.Remove(e.Cid)
232 233 234 235 236 237 238 239
				activeLk.Unlock()
			}(e)

		case <-ctx.Done():
			return
		}
	}
}