workers.go 4.58 KB
Newer Older
1 2 3
package bitswap

import (
4 5
	"os"
	"strconv"
6 7
	"time"

8 9 10 11
	inflect "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/chuckpreslar/inflect"
	process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
	u "github.com/ipfs/go-ipfs/util"
12 13
)

14 15 16 17 18 19 20
var TaskWorkerCount = 16

func init() {
	twc := os.Getenv("IPFS_TASK_WORKERS")
	if twc != "" {
		n, err := strconv.Atoi(twc)
		if err != nil {
Jeromy's avatar
Jeromy committed
21
			log.Error(err)
22 23
			return
		}
Jeromy's avatar
Jeromy committed
24 25 26 27 28
		if n > 0 {
			TaskWorkerCount = n
		} else {
			log.Errorf("Invalid value of '%d' for IPFS_TASK_WORKERS", n)
		}
29 30
	}
}
Jeromy's avatar
Jeromy committed
31

32
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
33 34 35 36 37
	// Start up a worker to handle block requests this node is making
	px.Go(func(px process.Process) {
		bs.clientWorker(ctx)
	})

Jeromy's avatar
Jeromy committed
38 39 40 41 42 43
	// Start up workers to handle requests from other nodes for the data on this node
	for i := 0; i < TaskWorkerCount; i++ {
		px.Go(func(px process.Process) {
			bs.taskWorker(ctx)
		})
	}
44 45 46 47 48 49

	// Start up a worker to manage periodically resending our wantlist out to peers
	px.Go(func(px process.Process) {
		bs.rebroadcastWorker(ctx)
	})

50 51 52 53
	px.Go(func(px process.Process) {
		bs.provideCollector(ctx)
	})

54 55 56 57 58
	// Spawn up multiple workers to handle incoming blocks
	// consider increasing number if providing blocks bottlenecks
	// file transfers
	for i := 0; i < provideWorkers; i++ {
		px.Go(func(px process.Process) {
Jeromy's avatar
Jeromy committed
59
			bs.provideWorker(ctx)
60 61 62 63
		})
	}
}

64
func (bs *Bitswap) taskWorker(ctx context.Context) {
65 66 67 68 69 70 71 72 73 74 75
	defer log.Info("bitswap task worker shutting down...")
	for {
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
				log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
				bs.send(ctx, envelope.Peer, envelope.Message)
Jeromy's avatar
Jeromy committed
76
				envelope.Sent()
77 78 79 80 81 82 83 84 85
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

86
func (bs *Bitswap) provideWorker(ctx context.Context) {
87 88
	for {
		select {
89
		case k, ok := <-bs.provideKeys:
90
			if !ok {
91
				log.Debug("provideKeys channel closed")
92 93
				return
			}
Henry's avatar
Henry committed
94
			ctx, cancel := context.WithTimeout(ctx, provideTimeout)
95
			err := bs.network.Provide(ctx, k)
96 97 98
			if err != nil {
				log.Error(err)
			}
Henry's avatar
Henry committed
99
			cancel()
100 101 102 103 104 105
		case <-ctx.Done():
			return
		}
	}
}

106 107
func (bs *Bitswap) provideCollector(ctx context.Context) {
	defer close(bs.provideKeys)
Jeromy's avatar
Jeromy committed
108
	var toProvide []u.Key
109
	var nextKey u.Key
Jeromy's avatar
Jeromy committed
110
	var keysOut chan u.Key
111 112 113 114 115 116 117 118

	for {
		select {
		case blk, ok := <-bs.newBlocks:
			if !ok {
				log.Debug("newBlocks channel closed")
				return
			}
Jeromy's avatar
Jeromy committed
119 120 121 122
			if keysOut == nil {
				nextKey = blk.Key()
				keysOut = bs.provideKeys
			} else {
Jeromy's avatar
Jeromy committed
123
				toProvide = append(toProvide, blk.Key())
Jeromy's avatar
Jeromy committed
124 125
			}
		case keysOut <- nextKey:
Jeromy's avatar
Jeromy committed
126 127 128
			if len(toProvide) > 0 {
				nextKey = toProvide[0]
				toProvide = toProvide[1:]
129
			} else {
Jeromy's avatar
Jeromy committed
130
				keysOut = nil
131 132 133 134 135 136 137
			}
		case <-ctx.Done():
			return
		}
	}
}

138
// TODO ensure only one active request per key
139
func (bs *Bitswap) clientWorker(parent context.Context) {
140 141 142 143 144 145 146 147 148 149 150 151 152 153
	defer log.Info("bitswap client worker shutting down...")

	for {
		select {
		case req := <-bs.batchRequests:
			keys := req.keys
			if len(keys) == 0 {
				log.Warning("Received batch request for zero blocks")
				continue
			}
			for i, k := range keys {
				bs.wantlist.Add(k, kMaxPriority-i)
			}

154 155 156 157 158
			done := make(chan struct{})
			go func() {
				bs.wantNewBlocks(req.ctx, keys)
				close(done)
			}()
159 160 161 162

			// NB: Optimization. Assumes that providers of key[0] are likely to
			// be able to provide for all keys. This currently holds true in most
			// every situation. Later, this assumption may not hold as true.
Henry's avatar
Henry committed
163
			child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout)
164 165 166 167 168
			providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
			err := bs.sendWantlistToPeers(req.ctx, providers)
			if err != nil {
				log.Debugf("error sending wantlist: %s", err)
			}
Henry's avatar
Henry committed
169
			cancel()
170 171 172 173

			// Wait for wantNewBlocks to finish
			<-done

174 175 176 177 178 179
		case <-parent.Done():
			return
		}
	}
}

180
func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
	ctx, cancel := context.WithCancel(parent)
	defer cancel()

	broadcastSignal := time.After(rebroadcastDelay.Get())

	for {
		select {
		case <-time.Tick(10 * time.Second):
			n := bs.wantlist.Len()
			if n > 0 {
				log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist")
			}
		case <-broadcastSignal: // resend unfulfilled wantlist keys
			entries := bs.wantlist.Entries()
			if len(entries) > 0 {
				bs.sendWantlistToProviders(ctx, entries)
			}
			broadcastSignal = time.After(rebroadcastDelay.Get())
		case <-parent.Done():
			return
		}
	}
}