workers.go 5.23 KB
Newer Older
1 2 3
package bitswap

import (
4
	"sync"
5 6
	"time"

George Antoniadis's avatar
George Antoniadis committed
7 8
	process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
	procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
9
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
10

11
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
Jeromy's avatar
Jeromy committed
12 13
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
	peer "gx/ipfs/QmWtbQU15LaB5B1JC2F7TV9P4K88vD3PpA4AJrwfCjhML8/go-libp2p-peer"
George Antoniadis's avatar
George Antoniadis committed
14
	key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"
15 16
)

17
var TaskWorkerCount = 8
18

19
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
20 21
	// Start up a worker to handle block requests this node is making
	px.Go(func(px process.Process) {
22
		bs.providerQueryManager(ctx)
23 24
	})

Jeromy's avatar
Jeromy committed
25 26
	// Start up workers to handle requests from other nodes for the data on this node
	for i := 0; i < TaskWorkerCount; i++ {
27
		i := i
Jeromy's avatar
Jeromy committed
28
		px.Go(func(px process.Process) {
29
			bs.taskWorker(ctx, i)
Jeromy's avatar
Jeromy committed
30 31
		})
	}
32 33

	// Start up a worker to manage periodically resending our wantlist out to peers
Jeromy's avatar
Jeromy committed
34 35 36
	px.Go(func(px process.Process) {
		bs.rebroadcastWorker(ctx)
	})
37

Jeromy's avatar
Jeromy committed
38
	// Start up a worker to manage sending out provides messages
39 40 41 42
	px.Go(func(px process.Process) {
		bs.provideCollector(ctx)
	})

43 44 45
	// Spawn up multiple workers to handle incoming blocks
	// consider increasing number if providing blocks bottlenecks
	// file transfers
46
	px.Go(bs.provideWorker)
47 48
}

49
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
Jeromy's avatar
Jeromy committed
50
	idmap := logging.LoggableMap{"ID": id}
51 52
	defer log.Info("bitswap task worker shutting down...")
	for {
53
		log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
54 55 56 57 58 59 60
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
Jeromy's avatar
Jeromy committed
61
				log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableMap{
62 63
					"ID":     id,
					"Target": envelope.Peer.Pretty(),
64
					"Block":  envelope.Block.Multihash().B58String(),
65
				})
66

67
				bs.wm.SendBlock(ctx, envelope)
68 69 70 71 72 73 74 75 76
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

77 78
func (bs *Bitswap) provideWorker(px process.Process) {

79
	limit := make(chan struct{}, provideWorkerMax)
80 81

	limitedGoProvide := func(k key.Key, wid int) {
82 83 84 85
		defer func() {
			// replace token when done
			<-limit
		}()
Jeromy's avatar
Jeromy committed
86
		ev := logging.LoggableMap{"ID": wid}
87

88 89
		ctx := procctx.OnClosingContext(px) // derive ctx from px
		defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
90

91 92
		ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
		defer cancel()
93

94
		if err := bs.network.Provide(ctx, k); err != nil {
Jeromy's avatar
Jeromy committed
95
			log.Warning(err)
96
		}
97
	}
98 99 100

	// worker spawner, reads from bs.provideKeys until it closes, spawning a
	// _ratelimited_ number of workers to handle each key.
101 102 103
	for wid := 2; ; wid++ {
		ev := logging.LoggableMap{"ID": 1}
		log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
104

105 106 107 108 109 110 111 112
		select {
		case <-px.Closing():
			return
		case k, ok := <-bs.provideKeys:
			if !ok {
				log.Debug("provideKeys channel closed")
				return
			}
113 114 115
			select {
			case <-px.Closing():
				return
116 117
			case limit <- struct{}{}:
				go limitedGoProvide(k, wid)
118 119
			}
		}
120
	}
121 122
}

123 124
func (bs *Bitswap) provideCollector(ctx context.Context) {
	defer close(bs.provideKeys)
125 126 127
	var toProvide []key.Key
	var nextKey key.Key
	var keysOut chan key.Key
128 129 130 131 132 133 134 135

	for {
		select {
		case blk, ok := <-bs.newBlocks:
			if !ok {
				log.Debug("newBlocks channel closed")
				return
			}
136

Jeromy's avatar
Jeromy committed
137 138 139 140
			if keysOut == nil {
				nextKey = blk.Key()
				keysOut = bs.provideKeys
			} else {
Jeromy's avatar
Jeromy committed
141
				toProvide = append(toProvide, blk.Key())
Jeromy's avatar
Jeromy committed
142 143
			}
		case keysOut <- nextKey:
Jeromy's avatar
Jeromy committed
144 145 146
			if len(toProvide) > 0 {
				nextKey = toProvide[0]
				toProvide = toProvide[1:]
147
			} else {
Jeromy's avatar
Jeromy committed
148
				keysOut = nil
149 150 151 152 153 154 155
			}
		case <-ctx.Done():
			return
		}
	}
}

156
func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
157 158 159
	ctx, cancel := context.WithCancel(parent)
	defer cancel()

160 161 162 163 164
	broadcastSignal := time.NewTicker(rebroadcastDelay.Get())
	defer broadcastSignal.Stop()

	tick := time.NewTicker(10 * time.Second)
	defer tick.Stop()
165 166

	for {
167
		log.Event(ctx, "Bitswap.Rebroadcast.idle")
168
		select {
169 170
		case <-tick.C:
			n := bs.wm.wl.Len()
171
			if n > 0 {
172
				log.Debug(n, " keys in bitswap wantlist")
173
			}
174
		case <-broadcastSignal.C: // resend unfulfilled wantlist keys
175
			log.Event(ctx, "Bitswap.Rebroadcast.active")
176
			for _, e := range bs.wm.wl.Entries() {
Jeromy's avatar
Jeromy committed
177
				e := e
178
				bs.findKeys <- &e
179 180 181 182 183 184
			}
		case <-parent.Done():
			return
		}
	}
}
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222

func (bs *Bitswap) providerQueryManager(ctx context.Context) {
	var activeLk sync.Mutex
	active := make(map[key.Key]*wantlist.Entry)

	for {
		select {
		case e := <-bs.findKeys:
			activeLk.Lock()
			if _, ok := active[e.Key]; ok {
				activeLk.Unlock()
				continue
			}
			active[e.Key] = e
			activeLk.Unlock()

			go func(e *wantlist.Entry) {
				child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout)
				defer cancel()
				providers := bs.network.FindProvidersAsync(child, e.Key, maxProvidersPerRequest)
				for p := range providers {
					go func(p peer.ID) {
						err := bs.network.ConnectTo(child, p)
						if err != nil {
							log.Debug("failed to connect to provider %s: %s", p, err)
						}
					}(p)
				}
				activeLk.Lock()
				delete(active, e.Key)
				activeLk.Unlock()
			}(e)

		case <-ctx.Done():
			return
		}
	}
}