workers.go 5.75 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5
	"math/rand"
6
	"sync"
7 8
	"time"

Jeromy's avatar
Jeromy committed
9
	bsmsg "github.com/ipfs/go-bitswap/message"
10

Jeromy's avatar
Jeromy committed
11 12 13 14 15
	cid "github.com/ipfs/go-cid"
	logging "github.com/ipfs/go-log"
	process "github.com/jbenet/goprocess"
	procctx "github.com/jbenet/goprocess/context"
	peer "github.com/libp2p/go-libp2p-peer"
16 17
)

18
var TaskWorkerCount = 8
19

20
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
21 22
	// Start up a worker to handle block requests this node is making
	px.Go(func(px process.Process) {
23
		bs.providerQueryManager(ctx)
24 25
	})

Jeromy's avatar
Jeromy committed
26 27
	// Start up workers to handle requests from other nodes for the data on this node
	for i := 0; i < TaskWorkerCount; i++ {
28
		i := i
Jeromy's avatar
Jeromy committed
29
		px.Go(func(px process.Process) {
30
			bs.taskWorker(ctx, i)
Jeromy's avatar
Jeromy committed
31 32
		})
	}
33 34

	// Start up a worker to manage periodically resending our wantlist out to peers
Jeromy's avatar
Jeromy committed
35 36 37
	px.Go(func(px process.Process) {
		bs.rebroadcastWorker(ctx)
	})
38

Jeromy's avatar
Jeromy committed
39
	// Start up a worker to manage sending out provides messages
40 41 42 43
	px.Go(func(px process.Process) {
		bs.provideCollector(ctx)
	})

44 45 46
	// Spawn up multiple workers to handle incoming blocks
	// consider increasing number if providing blocks bottlenecks
	// file transfers
47
	px.Go(bs.provideWorker)
48 49
}

50
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
Jeromy's avatar
Jeromy committed
51
	idmap := logging.LoggableMap{"ID": id}
Jeromy's avatar
Jeromy committed
52
	defer log.Debug("bitswap task worker shutting down...")
53
	for {
54
		log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
55 56 57 58 59 60 61
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
62 63 64 65 66 67 68
				log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableF(func() map[string]interface{} {
					return logging.LoggableMap{
						"ID":     id,
						"Target": envelope.Peer.Pretty(),
						"Block":  envelope.Block.Cid().String(),
					}
				}))
69

70 71 72 73 74 75
				// update the BS ledger to reflect sent message
				// TODO: Should only track *useful* messages in ledger
				outgoing := bsmsg.New(false)
				outgoing.AddBlock(envelope.Block)
				bs.engine.MessageSent(envelope.Peer, outgoing)

76
				bs.wm.SendBlock(ctx, envelope)
Jeromy's avatar
Jeromy committed
77
				bs.counterLk.Lock()
78 79
				bs.counters.blocksSent++
				bs.counters.dataSent += uint64(len(envelope.Block.RawData()))
Jeromy's avatar
Jeromy committed
80
				bs.counterLk.Unlock()
81 82 83 84 85 86 87 88 89
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

90 91
func (bs *Bitswap) provideWorker(px process.Process) {

92
	limit := make(chan struct{}, provideWorkerMax)
93

94
	limitedGoProvide := func(k *cid.Cid, wid int) {
95 96 97 98
		defer func() {
			// replace token when done
			<-limit
		}()
Jeromy's avatar
Jeromy committed
99
		ev := logging.LoggableMap{"ID": wid}
100

101
		ctx := procctx.OnClosingContext(px) // derive ctx from px
102
		defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, k).Done()
103

104 105
		ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
		defer cancel()
106

107
		if err := bs.network.Provide(ctx, k); err != nil {
Jeromy's avatar
Jeromy committed
108
			log.Warning(err)
109
		}
110
	}
111 112 113

	// worker spawner, reads from bs.provideKeys until it closes, spawning a
	// _ratelimited_ number of workers to handle each key.
114 115 116
	for wid := 2; ; wid++ {
		ev := logging.LoggableMap{"ID": 1}
		log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
117

118 119 120 121 122 123 124 125
		select {
		case <-px.Closing():
			return
		case k, ok := <-bs.provideKeys:
			if !ok {
				log.Debug("provideKeys channel closed")
				return
			}
126 127 128
			select {
			case <-px.Closing():
				return
129 130
			case limit <- struct{}{}:
				go limitedGoProvide(k, wid)
131 132
			}
		}
133
	}
134 135
}

136 137
func (bs *Bitswap) provideCollector(ctx context.Context) {
	defer close(bs.provideKeys)
138 139 140
	var toProvide []*cid.Cid
	var nextKey *cid.Cid
	var keysOut chan *cid.Cid
141 142 143

	for {
		select {
144
		case blkey, ok := <-bs.newBlocks:
145 146 147 148
			if !ok {
				log.Debug("newBlocks channel closed")
				return
			}
149

Jeromy's avatar
Jeromy committed
150
			if keysOut == nil {
151
				nextKey = blkey
Jeromy's avatar
Jeromy committed
152 153
				keysOut = bs.provideKeys
			} else {
154
				toProvide = append(toProvide, blkey)
Jeromy's avatar
Jeromy committed
155 156
			}
		case keysOut <- nextKey:
Jeromy's avatar
Jeromy committed
157 158 159
			if len(toProvide) > 0 {
				nextKey = toProvide[0]
				toProvide = toProvide[1:]
160
			} else {
Jeromy's avatar
Jeromy committed
161
				keysOut = nil
162 163 164 165 166 167 168
			}
		case <-ctx.Done():
			return
		}
	}
}

169
func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
170 171 172
	ctx, cancel := context.WithCancel(parent)
	defer cancel()

173 174 175 176 177
	broadcastSignal := time.NewTicker(rebroadcastDelay.Get())
	defer broadcastSignal.Stop()

	tick := time.NewTicker(10 * time.Second)
	defer tick.Stop()
178 179

	for {
180
		log.Event(ctx, "Bitswap.Rebroadcast.idle")
181
		select {
182 183
		case <-tick.C:
			n := bs.wm.wl.Len()
184
			if n > 0 {
185
				log.Debug(n, " keys in bitswap wantlist")
186
			}
187
		case <-broadcastSignal.C: // resend unfulfilled wantlist keys
188
			log.Event(ctx, "Bitswap.Rebroadcast.active")
189 190 191 192
			entries := bs.wm.wl.Entries()
			if len(entries) == 0 {
				continue
			}
193 194 195 196 197

			// TODO: come up with a better strategy for determining when to search
			// for new providers for blocks.
			i := rand.Intn(len(entries))
			bs.findKeys <- &blockRequest{
198
				Cid: entries[i].Cid,
199
				Ctx: ctx,
200 201 202 203 204 205
			}
		case <-parent.Done():
			return
		}
	}
}
206 207 208

func (bs *Bitswap) providerQueryManager(ctx context.Context) {
	var activeLk sync.Mutex
209
	kset := cid.NewSet()
210 211 212 213

	for {
		select {
		case e := <-bs.findKeys:
214 215 216 217 218 219
			select { // make sure its not already cancelled
			case <-e.Ctx.Done():
				continue
			default:
			}

220
			activeLk.Lock()
221
			if kset.Has(e.Cid) {
222 223 224
				activeLk.Unlock()
				continue
			}
225
			kset.Add(e.Cid)
226 227
			activeLk.Unlock()

228
			go func(e *blockRequest) {
229 230
				child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout)
				defer cancel()
231
				providers := bs.network.FindProvidersAsync(child, e.Cid, maxProvidersPerRequest)
232
				wg := &sync.WaitGroup{}
233
				for p := range providers {
234
					wg.Add(1)
235
					go func(p peer.ID) {
236
						defer wg.Done()
237 238 239 240 241 242
						err := bs.network.ConnectTo(child, p)
						if err != nil {
							log.Debug("failed to connect to provider %s: %s", p, err)
						}
					}(p)
				}
243
				wg.Wait()
244
				activeLk.Lock()
245
				kset.Remove(e.Cid)
246 247 248 249 250 251 252 253
				activeLk.Unlock()
			}(e)

		case <-ctx.Done():
			return
		}
	}
}