workers.go 5.21 KB
Newer Older
1 2 3 4 5
package bitswap

import (
	"time"

6
	process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
7
	procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
8
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
9

10
	key "github.com/ipfs/go-ipfs/blocks/key"
11
	logging "gx/ipfs/QmaPaGNE2GqnfJjRRpQuQuFHuJn4FZvsrGxdik4kgxCkBi/go-log"
12 13
)

14
var TaskWorkerCount = 8
15

16
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
17 18
	// Start up a worker to handle block requests this node is making
	px.Go(func(px process.Process) {
Jeromy's avatar
Jeromy committed
19
		bs.providerConnector(ctx)
20 21
	})

Jeromy's avatar
Jeromy committed
22 23
	// Start up workers to handle requests from other nodes for the data on this node
	for i := 0; i < TaskWorkerCount; i++ {
24
		i := i
Jeromy's avatar
Jeromy committed
25
		px.Go(func(px process.Process) {
26
			bs.taskWorker(ctx, i)
Jeromy's avatar
Jeromy committed
27 28
		})
	}
29 30

	// Start up a worker to manage periodically resending our wantlist out to peers
Jeromy's avatar
Jeromy committed
31 32 33
	px.Go(func(px process.Process) {
		bs.rebroadcastWorker(ctx)
	})
34

Jeromy's avatar
Jeromy committed
35
	// Start up a worker to manage sending out provides messages
36 37 38 39
	px.Go(func(px process.Process) {
		bs.provideCollector(ctx)
	})

40 41 42
	// Spawn up multiple workers to handle incoming blocks
	// consider increasing number if providing blocks bottlenecks
	// file transfers
43
	px.Go(bs.provideWorker)
44 45
}

46
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
Jeromy's avatar
Jeromy committed
47
	idmap := logging.LoggableMap{"ID": id}
48 49
	defer log.Info("bitswap task worker shutting down...")
	for {
50
		log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
51 52 53 54 55 56 57
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
Jeromy's avatar
Jeromy committed
58
				log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableMap{
59 60 61 62
					"ID":     id,
					"Target": envelope.Peer.Pretty(),
					"Block":  envelope.Block.Multihash.B58String(),
				})
63

64
				bs.wm.SendBlock(ctx, envelope)
65 66 67 68 69 70 71 72 73
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

74 75
func (bs *Bitswap) provideWorker(px process.Process) {

76
	limit := make(chan struct{}, provideWorkerMax)
77 78

	limitedGoProvide := func(k key.Key, wid int) {
79 80 81 82
		defer func() {
			// replace token when done
			<-limit
		}()
Jeromy's avatar
Jeromy committed
83
		ev := logging.LoggableMap{"ID": wid}
84

85 86
		ctx := procctx.OnClosingContext(px) // derive ctx from px
		defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
87

88 89
		ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
		defer cancel()
90

91
		if err := bs.network.Provide(ctx, k); err != nil {
Jeromy's avatar
Jeromy committed
92
			log.Warning(err)
93
		}
94
	}
95 96 97

	// worker spawner, reads from bs.provideKeys until it closes, spawning a
	// _ratelimited_ number of workers to handle each key.
98 99 100
	for wid := 2; ; wid++ {
		ev := logging.LoggableMap{"ID": 1}
		log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
101

102 103 104 105 106 107 108 109
		select {
		case <-px.Closing():
			return
		case k, ok := <-bs.provideKeys:
			if !ok {
				log.Debug("provideKeys channel closed")
				return
			}
110 111 112
			select {
			case <-px.Closing():
				return
113 114
			case limit <- struct{}{}:
				go limitedGoProvide(k, wid)
115 116
			}
		}
117
	}
118 119
}

120 121
func (bs *Bitswap) provideCollector(ctx context.Context) {
	defer close(bs.provideKeys)
122 123 124
	var toProvide []key.Key
	var nextKey key.Key
	var keysOut chan key.Key
125 126 127 128 129 130 131 132

	for {
		select {
		case blk, ok := <-bs.newBlocks:
			if !ok {
				log.Debug("newBlocks channel closed")
				return
			}
Jeromy's avatar
Jeromy committed
133 134 135 136
			if keysOut == nil {
				nextKey = blk.Key()
				keysOut = bs.provideKeys
			} else {
Jeromy's avatar
Jeromy committed
137
				toProvide = append(toProvide, blk.Key())
Jeromy's avatar
Jeromy committed
138 139
			}
		case keysOut <- nextKey:
Jeromy's avatar
Jeromy committed
140 141 142
			if len(toProvide) > 0 {
				nextKey = toProvide[0]
				toProvide = toProvide[1:]
143
			} else {
Jeromy's avatar
Jeromy committed
144
				keysOut = nil
145 146 147 148 149 150 151
			}
		case <-ctx.Done():
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
152 153
// connects to providers for the given keys
func (bs *Bitswap) providerConnector(parent context.Context) {
154 155 156
	defer log.Info("bitswap client worker shutting down...")

	for {
157
		log.Event(parent, "Bitswap.ProviderConnector.Loop")
158
		select {
Jeromy's avatar
Jeromy committed
159
		case req := <-bs.findKeys:
160 161 162 163 164
			keys := req.keys
			if len(keys) == 0 {
				log.Warning("Received batch request for zero blocks")
				continue
			}
Jeromy's avatar
Jeromy committed
165
			log.Event(parent, "Bitswap.ProviderConnector.Work", logging.LoggableMap{"Keys": keys})
166 167 168 169

			// NB: Optimization. Assumes that providers of key[0] are likely to
			// be able to provide for all keys. This currently holds true in most
			// every situation. Later, this assumption may not hold as true.
Henry's avatar
Henry committed
170
			child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout)
171
			providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
172 173
			for p := range providers {
				go bs.network.ConnectTo(req.ctx, p)
174
			}
Henry's avatar
Henry committed
175
			cancel()
176

177 178 179 180 181 182
		case <-parent.Done():
			return
		}
	}
}

183
func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
184 185 186
	ctx, cancel := context.WithCancel(parent)
	defer cancel()

187 188 189 190 191
	broadcastSignal := time.NewTicker(rebroadcastDelay.Get())
	defer broadcastSignal.Stop()

	tick := time.NewTicker(10 * time.Second)
	defer tick.Stop()
192 193

	for {
194
		log.Event(ctx, "Bitswap.Rebroadcast.idle")
195
		select {
196 197
		case <-tick.C:
			n := bs.wm.wl.Len()
198
			if n > 0 {
Jeromy's avatar
Jeromy committed
199
				log.Debug(n, "keys in bitswap wantlist")
200
			}
201
		case <-broadcastSignal.C: // resend unfulfilled wantlist keys
202
			log.Event(ctx, "Bitswap.Rebroadcast.active")
203
			entries := bs.wm.wl.Entries()
204
			if len(entries) > 0 {
205
				bs.connectToProviders(ctx, entries)
206 207 208 209 210 211
			}
		case <-parent.Done():
			return
		}
	}
}