workers.go 5.2 KB
Newer Older
1 2 3
package bitswap

import (
4
	"sync"
5 6
	"time"

7 8
	process "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
	procctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
9
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
10

11
	key "github.com/ipfs/go-ipfs/blocks/key"
12
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
13 14
	logging "gx/ipfs/QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR/go-log"
	peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
15 16
)

17
var TaskWorkerCount = 8
18

19
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
20 21
	// Start up a worker to handle block requests this node is making
	px.Go(func(px process.Process) {
22
		bs.providerQueryManager(ctx)
23 24
	})

Jeromy's avatar
Jeromy committed
25 26
	// Start up workers to handle requests from other nodes for the data on this node
	for i := 0; i < TaskWorkerCount; i++ {
27
		i := i
Jeromy's avatar
Jeromy committed
28
		px.Go(func(px process.Process) {
29
			bs.taskWorker(ctx, i)
Jeromy's avatar
Jeromy committed
30 31
		})
	}
32 33

	// Start up a worker to manage periodically resending our wantlist out to peers
Jeromy's avatar
Jeromy committed
34 35 36
	px.Go(func(px process.Process) {
		bs.rebroadcastWorker(ctx)
	})
37

Jeromy's avatar
Jeromy committed
38
	// Start up a worker to manage sending out provides messages
39 40 41 42
	px.Go(func(px process.Process) {
		bs.provideCollector(ctx)
	})

43 44 45
	// Spawn up multiple workers to handle incoming blocks
	// consider increasing number if providing blocks bottlenecks
	// file transfers
46
	px.Go(bs.provideWorker)
47 48
}

49
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
Jeromy's avatar
Jeromy committed
50
	idmap := logging.LoggableMap{"ID": id}
51 52
	defer log.Info("bitswap task worker shutting down...")
	for {
53
		log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
54 55 56 57 58 59 60
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
Jeromy's avatar
Jeromy committed
61
				log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableMap{
62 63
					"ID":     id,
					"Target": envelope.Peer.Pretty(),
64
					"Block":  envelope.Block.Multihash().B58String(),
65
				})
66

67
				bs.wm.SendBlock(ctx, envelope)
68 69 70 71 72 73 74 75 76
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

77 78
func (bs *Bitswap) provideWorker(px process.Process) {

79
	limit := make(chan struct{}, provideWorkerMax)
80 81

	limitedGoProvide := func(k key.Key, wid int) {
82 83 84 85
		defer func() {
			// replace token when done
			<-limit
		}()
Jeromy's avatar
Jeromy committed
86
		ev := logging.LoggableMap{"ID": wid}
87

88 89
		ctx := procctx.OnClosingContext(px) // derive ctx from px
		defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
90

91 92
		ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
		defer cancel()
93

94
		if err := bs.network.Provide(ctx, k); err != nil {
Jeromy's avatar
Jeromy committed
95
			log.Warning(err)
96
		}
97
	}
98 99 100

	// worker spawner, reads from bs.provideKeys until it closes, spawning a
	// _ratelimited_ number of workers to handle each key.
101 102 103
	for wid := 2; ; wid++ {
		ev := logging.LoggableMap{"ID": 1}
		log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
104

105 106 107 108 109 110 111 112
		select {
		case <-px.Closing():
			return
		case k, ok := <-bs.provideKeys:
			if !ok {
				log.Debug("provideKeys channel closed")
				return
			}
113 114 115
			select {
			case <-px.Closing():
				return
116 117
			case limit <- struct{}{}:
				go limitedGoProvide(k, wid)
118 119
			}
		}
120
	}
121 122
}

123 124
func (bs *Bitswap) provideCollector(ctx context.Context) {
	defer close(bs.provideKeys)
125 126 127
	var toProvide []key.Key
	var nextKey key.Key
	var keysOut chan key.Key
128 129 130 131 132 133 134 135

	for {
		select {
		case blk, ok := <-bs.newBlocks:
			if !ok {
				log.Debug("newBlocks channel closed")
				return
			}
Jeromy's avatar
Jeromy committed
136 137 138 139
			if keysOut == nil {
				nextKey = blk.Key()
				keysOut = bs.provideKeys
			} else {
Jeromy's avatar
Jeromy committed
140
				toProvide = append(toProvide, blk.Key())
Jeromy's avatar
Jeromy committed
141 142
			}
		case keysOut <- nextKey:
Jeromy's avatar
Jeromy committed
143 144 145
			if len(toProvide) > 0 {
				nextKey = toProvide[0]
				toProvide = toProvide[1:]
146
			} else {
Jeromy's avatar
Jeromy committed
147
				keysOut = nil
148 149 150 151 152 153 154
			}
		case <-ctx.Done():
			return
		}
	}
}

155
func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
156 157 158
	ctx, cancel := context.WithCancel(parent)
	defer cancel()

159 160 161 162 163
	broadcastSignal := time.NewTicker(rebroadcastDelay.Get())
	defer broadcastSignal.Stop()

	tick := time.NewTicker(10 * time.Second)
	defer tick.Stop()
164 165

	for {
166
		log.Event(ctx, "Bitswap.Rebroadcast.idle")
167
		select {
168 169
		case <-tick.C:
			n := bs.wm.wl.Len()
170
			if n > 0 {
Jeromy's avatar
Jeromy committed
171
				log.Debug(n, "keys in bitswap wantlist")
172
			}
173
		case <-broadcastSignal.C: // resend unfulfilled wantlist keys
174
			log.Event(ctx, "Bitswap.Rebroadcast.active")
175
			for _, e := range bs.wm.wl.Entries() {
Jeromy's avatar
Jeromy committed
176
				e := e
177
				bs.findKeys <- &e
178 179 180 181 182 183
			}
		case <-parent.Done():
			return
		}
	}
}
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221

func (bs *Bitswap) providerQueryManager(ctx context.Context) {
	var activeLk sync.Mutex
	active := make(map[key.Key]*wantlist.Entry)

	for {
		select {
		case e := <-bs.findKeys:
			activeLk.Lock()
			if _, ok := active[e.Key]; ok {
				activeLk.Unlock()
				continue
			}
			active[e.Key] = e
			activeLk.Unlock()

			go func(e *wantlist.Entry) {
				child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout)
				defer cancel()
				providers := bs.network.FindProvidersAsync(child, e.Key, maxProvidersPerRequest)
				for p := range providers {
					go func(p peer.ID) {
						err := bs.network.ConnectTo(child, p)
						if err != nil {
							log.Debug("failed to connect to provider %s: %s", p, err)
						}
					}(p)
				}
				activeLk.Lock()
				delete(active, e.Key)
				activeLk.Unlock()
			}(e)

		case <-ctx.Done():
			return
		}
	}
}