workers.go 5.27 KB
Newer Older
1 2 3 4 5
package bitswap

import (
	"time"

6
	process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
7
	procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
8
	ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
9
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
10

11
	key "github.com/ipfs/go-ipfs/blocks/key"
12
	eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
13 14
)

15
var TaskWorkerCount = 8
16

17
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
18 19
	// Start up a worker to handle block requests this node is making
	px.Go(func(px process.Process) {
Jeromy's avatar
Jeromy committed
20
		bs.providerConnector(ctx)
21 22
	})

Jeromy's avatar
Jeromy committed
23 24
	// Start up workers to handle requests from other nodes for the data on this node
	for i := 0; i < TaskWorkerCount; i++ {
25
		i := i
Jeromy's avatar
Jeromy committed
26
		px.Go(func(px process.Process) {
27
			bs.taskWorker(ctx, i)
Jeromy's avatar
Jeromy committed
28 29
		})
	}
30 31

	// Start up a worker to manage periodically resending our wantlist out to peers
Jeromy's avatar
Jeromy committed
32 33 34
	px.Go(func(px process.Process) {
		bs.rebroadcastWorker(ctx)
	})
35

Jeromy's avatar
Jeromy committed
36
	// Start up a worker to manage sending out provides messages
37 38 39 40
	px.Go(func(px process.Process) {
		bs.provideCollector(ctx)
	})

41 42 43
	// Spawn up multiple workers to handle incoming blocks
	// consider increasing number if providing blocks bottlenecks
	// file transfers
44
	px.Go(bs.provideWorker)
45 46
}

47 48
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
	idmap := eventlog.LoggableMap{"ID": id}
49 50
	defer log.Info("bitswap task worker shutting down...")
	for {
51
		log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
52 53 54 55 56 57 58
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
59 60 61 62 63
				log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{
					"ID":     id,
					"Target": envelope.Peer.Pretty(),
					"Block":  envelope.Block.Multihash.B58String(),
				})
64

65
				bs.wm.SendBlock(ctx, envelope)
66 67 68 69 70 71 72 73 74
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

75 76 77 78 79 80 81 82
func (bs *Bitswap) provideWorker(px process.Process) {

	limiter := ratelimit.NewRateLimiter(px, provideWorkerMax)

	limitedGoProvide := func(k key.Key, wid int) {
		ev := eventlog.LoggableMap{"ID": wid}
		limiter.LimitedGo(func(px process.Process) {

83
			ctx := procctx.OnClosingContext(px) // derive ctx from px
84 85 86 87 88 89
			defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()

			ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
			defer cancel()

			if err := bs.network.Provide(ctx, k); err != nil {
90 91
				log.Error(err)
			}
92
		})
93
	}
94 95 96 97 98 99

	// worker spawner, reads from bs.provideKeys until it closes, spawning a
	// _ratelimited_ number of workers to handle each key.
	limiter.Go(func(px process.Process) {
		for wid := 2; ; wid++ {
			ev := eventlog.LoggableMap{"ID": 1}
100
			log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
101 102 103 104 105 106 107 108 109 110 111 112 113

			select {
			case <-px.Closing():
				return
			case k, ok := <-bs.provideKeys:
				if !ok {
					log.Debug("provideKeys channel closed")
					return
				}
				limitedGoProvide(k, wid)
			}
		}
	})
114 115
}

116 117
func (bs *Bitswap) provideCollector(ctx context.Context) {
	defer close(bs.provideKeys)
118 119 120
	var toProvide []key.Key
	var nextKey key.Key
	var keysOut chan key.Key
121 122 123 124 125 126 127 128

	for {
		select {
		case blk, ok := <-bs.newBlocks:
			if !ok {
				log.Debug("newBlocks channel closed")
				return
			}
Jeromy's avatar
Jeromy committed
129 130 131 132
			if keysOut == nil {
				nextKey = blk.Key()
				keysOut = bs.provideKeys
			} else {
Jeromy's avatar
Jeromy committed
133
				toProvide = append(toProvide, blk.Key())
Jeromy's avatar
Jeromy committed
134 135
			}
		case keysOut <- nextKey:
Jeromy's avatar
Jeromy committed
136 137 138
			if len(toProvide) > 0 {
				nextKey = toProvide[0]
				toProvide = toProvide[1:]
139
			} else {
Jeromy's avatar
Jeromy committed
140
				keysOut = nil
141 142 143 144 145 146 147
			}
		case <-ctx.Done():
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
148 149
// connects to providers for the given keys
func (bs *Bitswap) providerConnector(parent context.Context) {
150 151 152
	defer log.Info("bitswap client worker shutting down...")

	for {
153
		log.Event(parent, "Bitswap.ProviderConnector.Loop")
154
		select {
Jeromy's avatar
Jeromy committed
155
		case req := <-bs.findKeys:
156 157 158 159 160
			keys := req.keys
			if len(keys) == 0 {
				log.Warning("Received batch request for zero blocks")
				continue
			}
161
			log.Event(parent, "Bitswap.ProviderConnector.Work", eventlog.LoggableMap{"Keys": keys})
162 163 164 165

			// NB: Optimization. Assumes that providers of key[0] are likely to
			// be able to provide for all keys. This currently holds true in most
			// every situation. Later, this assumption may not hold as true.
Henry's avatar
Henry committed
166
			child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout)
167
			providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
168 169
			for p := range providers {
				go bs.network.ConnectTo(req.ctx, p)
170
			}
Henry's avatar
Henry committed
171
			cancel()
172

173 174 175 176 177 178
		case <-parent.Done():
			return
		}
	}
}

179
func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
180 181 182
	ctx, cancel := context.WithCancel(parent)
	defer cancel()

183 184 185 186 187
	broadcastSignal := time.NewTicker(rebroadcastDelay.Get())
	defer broadcastSignal.Stop()

	tick := time.NewTicker(10 * time.Second)
	defer tick.Stop()
188 189

	for {
190
		log.Event(ctx, "Bitswap.Rebroadcast.idle")
191
		select {
192 193
		case <-tick.C:
			n := bs.wm.wl.Len()
194
			if n > 0 {
Jeromy's avatar
Jeromy committed
195
				log.Debug(n, "keys in bitswap wantlist")
196
			}
197
		case <-broadcastSignal.C: // resend unfulfilled wantlist keys
198
			log.Event(ctx, "Bitswap.Rebroadcast.active")
199
			entries := bs.wm.wl.Entries()
200
			if len(entries) > 0 {
201
				bs.connectToProviders(ctx, entries)
202 203 204 205 206 207
			}
		case <-parent.Done():
			return
		}
	}
}