workers.go 5.58 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5
	"math/rand"
6
	"sync"
7 8
	"time"

9 10
	process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
	procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
Jeromy's avatar
Jeromy committed
11
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
12
	cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
13
	peer "gx/ipfs/QmWUswjn261LSyVxWAEpMVtPdy8zmKBJJfBpG3Qdpa8ZsE/go-libp2p-peer"
14 15
)

16
var TaskWorkerCount = 8
17

18
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
19 20
	// Start up a worker to handle block requests this node is making
	px.Go(func(px process.Process) {
21
		bs.providerQueryManager(ctx)
22 23
	})

Jeromy's avatar
Jeromy committed
24 25
	// Start up workers to handle requests from other nodes for the data on this node
	for i := 0; i < TaskWorkerCount; i++ {
26
		i := i
Jeromy's avatar
Jeromy committed
27
		px.Go(func(px process.Process) {
28
			bs.taskWorker(ctx, i)
Jeromy's avatar
Jeromy committed
29 30
		})
	}
31 32

	// Start up a worker to manage periodically resending our wantlist out to peers
Jeromy's avatar
Jeromy committed
33 34 35
	px.Go(func(px process.Process) {
		bs.rebroadcastWorker(ctx)
	})
36

Jeromy's avatar
Jeromy committed
37
	// Start up a worker to manage sending out provides messages
38 39 40 41
	px.Go(func(px process.Process) {
		bs.provideCollector(ctx)
	})

42 43 44
	// Spawn up multiple workers to handle incoming blocks
	// consider increasing number if providing blocks bottlenecks
	// file transfers
45
	px.Go(bs.provideWorker)
46 47
}

48
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
Jeromy's avatar
Jeromy committed
49
	idmap := logging.LoggableMap{"ID": id}
50 51
	defer log.Info("bitswap task worker shutting down...")
	for {
52
		log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
53 54 55 56 57 58 59
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
Jeromy's avatar
Jeromy committed
60
				log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableMap{
61 62
					"ID":     id,
					"Target": envelope.Peer.Pretty(),
63
					"Block":  envelope.Block.Cid().String(),
64
				})
65

66
				bs.wm.SendBlock(ctx, envelope)
Jeromy's avatar
Jeromy committed
67 68 69 70
				bs.counterLk.Lock()
				bs.blocksSent++
				bs.dataSent += uint64(len(envelope.Block.RawData()))
				bs.counterLk.Unlock()
71 72 73 74 75 76 77 78 79
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

80 81
func (bs *Bitswap) provideWorker(px process.Process) {

82
	limit := make(chan struct{}, provideWorkerMax)
83

84
	limitedGoProvide := func(k *cid.Cid, wid int) {
85 86 87 88
		defer func() {
			// replace token when done
			<-limit
		}()
Jeromy's avatar
Jeromy committed
89
		ev := logging.LoggableMap{"ID": wid}
90

91
		ctx := procctx.OnClosingContext(px) // derive ctx from px
92
		defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, k).Done()
93

94 95
		ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
		defer cancel()
96

97
		if err := bs.network.Provide(ctx, k); err != nil {
Jeromy's avatar
Jeromy committed
98
			log.Warning(err)
99
		}
100
	}
101 102 103

	// worker spawner, reads from bs.provideKeys until it closes, spawning a
	// _ratelimited_ number of workers to handle each key.
104 105 106
	for wid := 2; ; wid++ {
		ev := logging.LoggableMap{"ID": 1}
		log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
107

108 109 110 111 112 113 114 115
		select {
		case <-px.Closing():
			return
		case k, ok := <-bs.provideKeys:
			if !ok {
				log.Debug("provideKeys channel closed")
				return
			}
116 117 118
			select {
			case <-px.Closing():
				return
119 120
			case limit <- struct{}{}:
				go limitedGoProvide(k, wid)
121 122
			}
		}
123
	}
124 125
}

126 127
func (bs *Bitswap) provideCollector(ctx context.Context) {
	defer close(bs.provideKeys)
128 129 130
	var toProvide []*cid.Cid
	var nextKey *cid.Cid
	var keysOut chan *cid.Cid
131 132 133

	for {
		select {
134
		case blkey, ok := <-bs.newBlocks:
135 136 137 138
			if !ok {
				log.Debug("newBlocks channel closed")
				return
			}
139

Jeromy's avatar
Jeromy committed
140
			if keysOut == nil {
141
				nextKey = blkey
Jeromy's avatar
Jeromy committed
142 143
				keysOut = bs.provideKeys
			} else {
144
				toProvide = append(toProvide, blkey)
Jeromy's avatar
Jeromy committed
145 146
			}
		case keysOut <- nextKey:
Jeromy's avatar
Jeromy committed
147 148 149
			if len(toProvide) > 0 {
				nextKey = toProvide[0]
				toProvide = toProvide[1:]
150
			} else {
Jeromy's avatar
Jeromy committed
151
				keysOut = nil
152 153 154 155 156 157 158
			}
		case <-ctx.Done():
			return
		}
	}
}

159
func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
160 161 162
	ctx, cancel := context.WithCancel(parent)
	defer cancel()

163 164 165 166 167
	broadcastSignal := time.NewTicker(rebroadcastDelay.Get())
	defer broadcastSignal.Stop()

	tick := time.NewTicker(10 * time.Second)
	defer tick.Stop()
168 169

	for {
170
		log.Event(ctx, "Bitswap.Rebroadcast.idle")
171
		select {
172 173
		case <-tick.C:
			n := bs.wm.wl.Len()
174
			if n > 0 {
175
				log.Debug(n, " keys in bitswap wantlist")
176
			}
177
		case <-broadcastSignal.C: // resend unfulfilled wantlist keys
178
			log.Event(ctx, "Bitswap.Rebroadcast.active")
179 180 181 182
			entries := bs.wm.wl.Entries()
			if len(entries) == 0 {
				continue
			}
183 184 185 186 187

			// TODO: come up with a better strategy for determining when to search
			// for new providers for blocks.
			i := rand.Intn(len(entries))
			bs.findKeys <- &blockRequest{
188
				Cid: entries[i].Cid,
189
				Ctx: ctx,
190 191 192 193 194 195
			}
		case <-parent.Done():
			return
		}
	}
}
196 197 198

func (bs *Bitswap) providerQueryManager(ctx context.Context) {
	var activeLk sync.Mutex
199
	kset := cid.NewSet()
200 201 202 203

	for {
		select {
		case e := <-bs.findKeys:
204 205 206 207 208 209
			select { // make sure its not already cancelled
			case <-e.Ctx.Done():
				continue
			default:
			}

210
			activeLk.Lock()
211
			if kset.Has(e.Cid) {
212 213 214
				activeLk.Unlock()
				continue
			}
215
			kset.Add(e.Cid)
216 217
			activeLk.Unlock()

218
			go func(e *blockRequest) {
219 220
				child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout)
				defer cancel()
221
				providers := bs.network.FindProvidersAsync(child, e.Cid, maxProvidersPerRequest)
222
				wg := &sync.WaitGroup{}
223
				for p := range providers {
224
					wg.Add(1)
225
					go func(p peer.ID) {
226
						defer wg.Done()
227 228 229 230 231 232
						err := bs.network.ConnectTo(child, p)
						if err != nil {
							log.Debug("failed to connect to provider %s: %s", p, err)
						}
					}(p)
				}
233
				wg.Wait()
234
				activeLk.Lock()
235
				kset.Remove(e.Cid)
236 237 238 239 240 241 242 243
				activeLk.Unlock()
			}(e)

		case <-ctx.Done():
			return
		}
	}
}