workers.go 5.86 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5
	"math/rand"
6
	"sync"
7 8
	"time"

9 10
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"

11 12
	process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
	procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
Jeromy's avatar
Jeromy committed
13
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
14
	cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
15
	peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
16 17
)

18
var TaskWorkerCount = 8
19

20
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
21 22
	// Start up a worker to handle block requests this node is making
	px.Go(func(px process.Process) {
23
		bs.providerQueryManager(ctx)
24 25
	})

Jeromy's avatar
Jeromy committed
26 27
	// Start up workers to handle requests from other nodes for the data on this node
	for i := 0; i < TaskWorkerCount; i++ {
28
		i := i
Jeromy's avatar
Jeromy committed
29
		px.Go(func(px process.Process) {
30
			bs.taskWorker(ctx, i)
Jeromy's avatar
Jeromy committed
31 32
		})
	}
33 34

	// Start up a worker to manage periodically resending our wantlist out to peers
Jeromy's avatar
Jeromy committed
35 36 37
	px.Go(func(px process.Process) {
		bs.rebroadcastWorker(ctx)
	})
38

Jeromy's avatar
Jeromy committed
39
	// Start up a worker to manage sending out provides messages
40 41 42 43
	px.Go(func(px process.Process) {
		bs.provideCollector(ctx)
	})

44 45 46
	// Spawn up multiple workers to handle incoming blocks
	// consider increasing number if providing blocks bottlenecks
	// file transfers
47
	px.Go(bs.provideWorker)
48 49
}

50
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
Jeromy's avatar
Jeromy committed
51
	idmap := logging.LoggableMap{"ID": id}
52 53
	defer log.Info("bitswap task worker shutting down...")
	for {
54
		log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
55 56 57 58 59 60 61
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
Jeromy's avatar
Jeromy committed
62
				log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableMap{
63 64
					"ID":     id,
					"Target": envelope.Peer.Pretty(),
65
					"Block":  envelope.Block.Cid().String(),
66
				})
67

68 69 70 71 72 73
				// update the BS ledger to reflect sent message
				// TODO: Should only track *useful* messages in ledger
				outgoing := bsmsg.New(false)
				outgoing.AddBlock(envelope.Block)
				bs.engine.MessageSent(envelope.Peer, outgoing)

74
				bs.wm.SendBlock(ctx, envelope)
Jeromy's avatar
Jeromy committed
75 76 77 78
				bs.counterLk.Lock()
				bs.blocksSent++
				bs.dataSent += uint64(len(envelope.Block.RawData()))
				bs.counterLk.Unlock()
79 80 81 82 83 84 85 86 87
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

88 89
func (bs *Bitswap) provideWorker(px process.Process) {

90
	limit := make(chan struct{}, provideWorkerMax)
91

92
	limitedGoProvide := func(k *cid.Cid, wid int) {
93 94 95 96
		defer func() {
			// replace token when done
			<-limit
		}()
Jeromy's avatar
Jeromy committed
97
		ev := logging.LoggableMap{"ID": wid}
98

99
		ctx := procctx.OnClosingContext(px) // derive ctx from px
100
		defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, k).Done()
101

102 103
		ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
		defer cancel()
104

105
		if err := bs.network.Provide(ctx, k); err != nil {
Jeromy's avatar
Jeromy committed
106
			log.Warning(err)
107
		}
108
	}
109 110 111

	// worker spawner, reads from bs.provideKeys until it closes, spawning a
	// _ratelimited_ number of workers to handle each key.
112 113 114
	for wid := 2; ; wid++ {
		ev := logging.LoggableMap{"ID": 1}
		log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
115

116 117 118 119 120 121 122 123
		select {
		case <-px.Closing():
			return
		case k, ok := <-bs.provideKeys:
			if !ok {
				log.Debug("provideKeys channel closed")
				return
			}
124 125 126
			select {
			case <-px.Closing():
				return
127 128
			case limit <- struct{}{}:
				go limitedGoProvide(k, wid)
129 130
			}
		}
131
	}
132 133
}

134 135
func (bs *Bitswap) provideCollector(ctx context.Context) {
	defer close(bs.provideKeys)
136 137 138
	var toProvide []*cid.Cid
	var nextKey *cid.Cid
	var keysOut chan *cid.Cid
139 140 141

	for {
		select {
142
		case blkey, ok := <-bs.newBlocks:
143 144 145 146
			if !ok {
				log.Debug("newBlocks channel closed")
				return
			}
147

Jeromy's avatar
Jeromy committed
148
			if keysOut == nil {
149
				nextKey = blkey
Jeromy's avatar
Jeromy committed
150 151
				keysOut = bs.provideKeys
			} else {
152
				toProvide = append(toProvide, blkey)
Jeromy's avatar
Jeromy committed
153 154
			}
		case keysOut <- nextKey:
Jeromy's avatar
Jeromy committed
155 156 157
			if len(toProvide) > 0 {
				nextKey = toProvide[0]
				toProvide = toProvide[1:]
158
			} else {
Jeromy's avatar
Jeromy committed
159
				keysOut = nil
160 161 162 163 164 165 166
			}
		case <-ctx.Done():
			return
		}
	}
}

167
func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
168 169 170
	ctx, cancel := context.WithCancel(parent)
	defer cancel()

171 172 173 174 175
	broadcastSignal := time.NewTicker(rebroadcastDelay.Get())
	defer broadcastSignal.Stop()

	tick := time.NewTicker(10 * time.Second)
	defer tick.Stop()
176 177

	for {
178
		log.Event(ctx, "Bitswap.Rebroadcast.idle")
179
		select {
180 181
		case <-tick.C:
			n := bs.wm.wl.Len()
182
			if n > 0 {
183
				log.Debug(n, " keys in bitswap wantlist")
184
			}
185
		case <-broadcastSignal.C: // resend unfulfilled wantlist keys
186
			log.Event(ctx, "Bitswap.Rebroadcast.active")
187 188 189 190
			entries := bs.wm.wl.Entries()
			if len(entries) == 0 {
				continue
			}
191 192 193 194 195

			// TODO: come up with a better strategy for determining when to search
			// for new providers for blocks.
			i := rand.Intn(len(entries))
			bs.findKeys <- &blockRequest{
196
				Cid: entries[i].Cid,
197
				Ctx: ctx,
198 199 200 201 202 203
			}
		case <-parent.Done():
			return
		}
	}
}
204 205 206

func (bs *Bitswap) providerQueryManager(ctx context.Context) {
	var activeLk sync.Mutex
207
	kset := cid.NewSet()
208 209 210 211

	for {
		select {
		case e := <-bs.findKeys:
212 213 214 215 216 217
			select { // make sure its not already cancelled
			case <-e.Ctx.Done():
				continue
			default:
			}

218
			activeLk.Lock()
219
			if kset.Has(e.Cid) {
220 221 222
				activeLk.Unlock()
				continue
			}
223
			kset.Add(e.Cid)
224 225
			activeLk.Unlock()

226
			go func(e *blockRequest) {
227 228
				child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout)
				defer cancel()
229
				providers := bs.network.FindProvidersAsync(child, e.Cid, maxProvidersPerRequest)
230
				wg := &sync.WaitGroup{}
231
				for p := range providers {
232
					wg.Add(1)
233
					go func(p peer.ID) {
234
						defer wg.Done()
235 236 237 238 239 240
						err := bs.network.ConnectTo(child, p)
						if err != nil {
							log.Debug("failed to connect to provider %s: %s", p, err)
						}
					}(p)
				}
241
				wg.Wait()
242
				activeLk.Lock()
243
				kset.Remove(e.Cid)
244 245 246 247 248 249 250 251
				activeLk.Unlock()
			}(e)

		case <-ctx.Done():
			return
		}
	}
}