workers.go 4.91 KB
Newer Older
1 2 3
package bitswap

import (
4 5
	"os"
	"strconv"
6 7
	"time"

8 9
	process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
10

11
	key "github.com/ipfs/go-ipfs/blocks/key"
12
	eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
13 14
)

15
var TaskWorkerCount = 8
16 17

func init() {
Jeromy Johnson's avatar
Jeromy Johnson committed
18
	twc := os.Getenv("IPFS_BITSWAP_TASK_WORKERS")
19 20 21
	if twc != "" {
		n, err := strconv.Atoi(twc)
		if err != nil {
Jeromy's avatar
Jeromy committed
22
			log.Error(err)
23 24
			return
		}
Jeromy's avatar
Jeromy committed
25 26 27
		if n > 0 {
			TaskWorkerCount = n
		} else {
Jeromy Johnson's avatar
Jeromy Johnson committed
28
			log.Errorf("Invalid value of '%d' for IPFS_BITSWAP_TASK_WORKERS", n)
Jeromy's avatar
Jeromy committed
29
		}
30 31
	}
}
Jeromy's avatar
Jeromy committed
32

33
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
34 35
	// Start up a worker to handle block requests this node is making
	px.Go(func(px process.Process) {
Jeromy's avatar
Jeromy committed
36
		bs.providerConnector(ctx)
37 38
	})

Jeromy's avatar
Jeromy committed
39 40
	// Start up workers to handle requests from other nodes for the data on this node
	for i := 0; i < TaskWorkerCount; i++ {
41
		i := i
Jeromy's avatar
Jeromy committed
42
		px.Go(func(px process.Process) {
43
			bs.taskWorker(ctx, i)
Jeromy's avatar
Jeromy committed
44 45
		})
	}
46 47

	// Start up a worker to manage periodically resending our wantlist out to peers
Jeromy's avatar
Jeromy committed
48 49 50
	px.Go(func(px process.Process) {
		bs.rebroadcastWorker(ctx)
	})
51

Jeromy's avatar
Jeromy committed
52
	// Start up a worker to manage sending out provides messages
53 54 55 56
	px.Go(func(px process.Process) {
		bs.provideCollector(ctx)
	})

57 58 59 60
	// Spawn up multiple workers to handle incoming blocks
	// consider increasing number if providing blocks bottlenecks
	// file transfers
	for i := 0; i < provideWorkers; i++ {
61
		i := i
62
		px.Go(func(px process.Process) {
63
			bs.provideWorker(ctx, i)
64 65 66 67
		})
	}
}

68 69
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
	idmap := eventlog.LoggableMap{"ID": id}
70 71
	defer log.Info("bitswap task worker shutting down...")
	for {
72
		log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
73 74 75 76 77 78 79
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
80
				log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{"ID": id, "Target": envelope.Peer.Pretty(), "Block": envelope.Block.Multihash.B58String()})
81

82
				bs.wm.SendBlock(ctx, envelope)
83 84 85 86 87 88 89 90 91
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

92 93
func (bs *Bitswap) provideWorker(ctx context.Context, id int) {
	idmap := eventlog.LoggableMap{"ID": id}
94
	for {
95
		log.Event(ctx, "Bitswap.ProvideWorker.Loop", idmap)
96
		select {
97
		case k, ok := <-bs.provideKeys:
98
			log.Event(ctx, "Bitswap.ProvideWorker.Work", idmap, &k)
99
			if !ok {
100
				log.Debug("provideKeys channel closed")
101 102
				return
			}
Henry's avatar
Henry committed
103
			ctx, cancel := context.WithTimeout(ctx, provideTimeout)
104
			err := bs.network.Provide(ctx, k)
105 106 107
			if err != nil {
				log.Error(err)
			}
Henry's avatar
Henry committed
108
			cancel()
109 110 111 112 113 114
		case <-ctx.Done():
			return
		}
	}
}

115 116
func (bs *Bitswap) provideCollector(ctx context.Context) {
	defer close(bs.provideKeys)
117 118 119
	var toProvide []key.Key
	var nextKey key.Key
	var keysOut chan key.Key
120 121 122 123 124 125 126 127

	for {
		select {
		case blk, ok := <-bs.newBlocks:
			if !ok {
				log.Debug("newBlocks channel closed")
				return
			}
Jeromy's avatar
Jeromy committed
128 129 130 131
			if keysOut == nil {
				nextKey = blk.Key()
				keysOut = bs.provideKeys
			} else {
Jeromy's avatar
Jeromy committed
132
				toProvide = append(toProvide, blk.Key())
Jeromy's avatar
Jeromy committed
133 134
			}
		case keysOut <- nextKey:
Jeromy's avatar
Jeromy committed
135 136 137
			if len(toProvide) > 0 {
				nextKey = toProvide[0]
				toProvide = toProvide[1:]
138
			} else {
Jeromy's avatar
Jeromy committed
139
				keysOut = nil
140 141 142 143 144 145 146
			}
		case <-ctx.Done():
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
147 148
// connects to providers for the given keys
func (bs *Bitswap) providerConnector(parent context.Context) {
149 150 151
	defer log.Info("bitswap client worker shutting down...")

	for {
152
		log.Event(parent, "Bitswap.ProviderConnector.Loop")
153
		select {
Jeromy's avatar
Jeromy committed
154
		case req := <-bs.findKeys:
155 156 157 158 159
			keys := req.keys
			if len(keys) == 0 {
				log.Warning("Received batch request for zero blocks")
				continue
			}
160
			log.Event(parent, "Bitswap.ProviderConnector.Work", eventlog.LoggableMap{"Keys": keys})
161 162 163 164

			// NB: Optimization. Assumes that providers of key[0] are likely to
			// be able to provide for all keys. This currently holds true in most
			// every situation. Later, this assumption may not hold as true.
Henry's avatar
Henry committed
165
			child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout)
166
			providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
167 168
			for p := range providers {
				go bs.network.ConnectTo(req.ctx, p)
169
			}
Henry's avatar
Henry committed
170
			cancel()
171

172 173 174 175 176 177
		case <-parent.Done():
			return
		}
	}
}

178
func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
179 180 181
	ctx, cancel := context.WithCancel(parent)
	defer cancel()

182 183 184 185 186
	broadcastSignal := time.NewTicker(rebroadcastDelay.Get())
	defer broadcastSignal.Stop()

	tick := time.NewTicker(10 * time.Second)
	defer tick.Stop()
187 188

	for {
189
		log.Event(ctx, "Bitswap.Rebroadcast.idle")
190
		select {
191 192
		case <-tick.C:
			n := bs.wm.wl.Len()
193
			if n > 0 {
Jeromy's avatar
Jeromy committed
194
				log.Debug(n, "keys in bitswap wantlist")
195
			}
196
		case <-broadcastSignal.C: // resend unfulfilled wantlist keys
197
			log.Event(ctx, "Bitswap.Rebroadcast.active")
198
			entries := bs.wm.wl.Entries()
199
			if len(entries) > 0 {
200
				bs.connectToProviders(ctx, entries)
201 202 203 204 205 206
			}
		case <-parent.Done():
			return
		}
	}
}