bitswap.go 8.35 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2
// package bitswap implements the IPFS Exchange interface with the BitSwap
// bilateral exchange protocol.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4 5
package bitswap

import (
Jeromy's avatar
Jeromy committed
6 7
	"time"

8
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
9
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11
	blocks "github.com/jbenet/go-ipfs/blocks"
12
	blockstore "github.com/jbenet/go-ipfs/blockstore"
13 14 15 16 17
	exchange "github.com/jbenet/go-ipfs/exchange"
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
	strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
18
	peer "github.com/jbenet/go-ipfs/peer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19
	u "github.com/jbenet/go-ipfs/util"
20
	"github.com/jbenet/go-ipfs/util/eventlog"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21 22
)

23
var log = eventlog.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24

25 26 27
// New initializes a BitSwap instance that communicates over the
// provided BitSwapNetwork. This function registers the returned instance as
// the network delegate.
28
// Runs until context is cancelled
29 30
func New(ctx context.Context, p peer.Peer,
	network bsnet.BitSwapNetwork, routing bsnet.Routing,
31
	d ds.ThreadSafeDatastore, nice bool) exchange.Interface {
32

33 34
	notif := notifications.New()
	go func() {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
35 36
		<-ctx.Done()
		notif.Shutdown()
37 38
	}()

39 40
	bs := &bitswap{
		blockstore:    blockstore.NewBlockstore(d),
41
		notifications: notif,
42
		strategy:      strategy.New(nice),
43
		routing:       routing,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
44
		sender:        network,
45
		wantlist:      u.NewKeySet(),
Brian Tiger Chow's avatar
naming  
Brian Tiger Chow committed
46
		blockRequests: make(chan u.Key, 32),
47
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
48
	network.SetDelegate(bs)
Jeromy's avatar
Jeromy committed
49
	go bs.run(ctx)
50 51 52 53

	return bs
}

54 55
// bitswap instances implement the bitswap protocol.
type bitswap struct {
56

57
	// sender delivers messages on behalf of the session
58
	sender bsnet.BitSwapNetwork
59

60
	// blockstore is the local database
61
	// NB: ensure threadsafety
62
	blockstore blockstore.Blockstore
63

64
	// routing interface for communication
65
	routing bsnet.Routing
66

67
	notifications notifications.PubSub
68

Brian Tiger Chow's avatar
naming  
Brian Tiger Chow committed
69
	blockRequests chan u.Key
Jeromy's avatar
Jeromy committed
70

71
	// strategy listens to network traffic and makes decisions about how to
72
	// interact with partners.
73 74
	// TODO(brian): save the strategy's state to the datastore
	strategy strategy.Strategy
75

76
	wantlist u.KeySet
77 78
}

79 80
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context
81 82
//
// TODO ensure only one active request per key
Jeromy's avatar
Jeromy committed
83
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
84 85 86 87

	// make sure to derive a new |ctx| and pass it to children. It's correct to
	// listen on |parent| here, but incorrect to pass |parent| to new async
	// functions. This is difficult to enforce. May this comment keep you safe.
88

89
	ctx, cancelFunc := context.WithCancel(parent)
90 91
	defer cancelFunc()

92 93
	ctx = eventlog.ContextWithMetadata(ctx, eventlog.Uuid("GetBlockRequest"))
	log.Event(ctx, "GetBlockRequestBegin", &k)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
94
	defer log.Event(ctx, "GetBlockRequestEnd", &k)
95

96
	bs.wantlist.Add(k)
97
	promise := bs.notifications.Subscribe(ctx, k)
98

Jeromy's avatar
Jeromy committed
99
	select {
Brian Tiger Chow's avatar
naming  
Brian Tiger Chow committed
100
	case bs.blockRequests <- k:
Jeromy's avatar
Jeromy committed
101 102 103
	case <-parent.Done():
		return nil, parent.Err()
	}
104 105

	select {
106
	case block := <-promise:
107
		bs.wantlist.Remove(k)
108
		return &block, nil
109 110
	case <-parent.Done():
		return nil, parent.Err()
111 112 113
	}
}

Jeromy's avatar
Jeromy committed
114 115 116 117 118 119 120 121 122 123 124
func (bs *bitswap) GetBlocks(parent context.Context, ks []u.Key) (*blocks.Block, error) {
	// TODO: something smart
	return nil, nil
}

func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
	message := bsmsg.New()
	for _, wanted := range bs.wantlist.Keys() {
		message.AddWanted(wanted)
	}
	for peerToQuery := range peers {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
125
		log.Event(ctx, "PeerToQuery", peerToQuery)
Jeromy's avatar
Jeromy committed
126 127
		go func(p peer.Peer) {

Brian Tiger Chow's avatar
Brian Tiger Chow committed
128
			log.Event(ctx, "DialPeer", p)
Jeromy's avatar
Jeromy committed
129 130
			err := bs.sender.DialPeer(ctx, p)
			if err != nil {
131
				log.Errorf("Error sender.DialPeer(%s): %s", p, err)
Jeromy's avatar
Jeromy committed
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
				return
			}

			response, err := bs.sender.SendRequest(ctx, p, message)
			if err != nil {
				log.Errorf("Error sender.SendRequest(%s) = %s", p, err)
				return
			}
			// FIXME ensure accounting is handled correctly when
			// communication fails. May require slightly different API to
			// get better guarantees. May need shared sequence numbers.
			bs.strategy.MessageSent(p, message)

			if response == nil {
				return
			}
			bs.ReceiveMessage(ctx, p, response)
		}(peerToQuery)
	}
	return nil
}

func (bs *bitswap) run(ctx context.Context) {

156 157
	// Every so often, we should resend out our current want list
	rebroadcastTime := time.Second * 5
Brian Tiger Chow's avatar
Brian Tiger Chow committed
158

Brian Tiger Chow's avatar
Brian Tiger Chow committed
159
	var providers <-chan peer.Peer // NB: must be initialized to zero value
Brian Tiger Chow's avatar
Brian Tiger Chow committed
160
	broadcastSignal := time.After(rebroadcastPeriod)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
161
	unsentKeys := 0
Brian Tiger Chow's avatar
Brian Tiger Chow committed
162

Jeromy's avatar
Jeromy committed
163 164
	for {
		select {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
165
		case <-broadcastSignal:
Jeromy's avatar
Jeromy committed
166 167 168 169
			wantlist := bs.wantlist.Keys()
			if len(wantlist) == 0 {
				continue
			}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
170
			if providers == nil {
Jeromy's avatar
Jeromy committed
171
				// rely on semi randomness of maps
Jeromy's avatar
Jeromy committed
172
				firstKey := wantlist[0]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
173
				providers = bs.routing.FindProvidersAsync(ctx, firstKey, 6)
Jeromy's avatar
Jeromy committed
174
			}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
175
			err := bs.sendWantListTo(ctx, providers)
Jeromy's avatar
Jeromy committed
176
			if err != nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
177
				log.Errorf("error sending wantlist: %s", err)
Jeromy's avatar
Jeromy committed
178
			}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
179
			providers = nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
180
			broadcastSignal = time.After(rebroadcastPeriod)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
181

Brian Tiger Chow's avatar
naming  
Brian Tiger Chow committed
182
		case k := <-bs.blockRequests:
Brian Tiger Chow's avatar
Brian Tiger Chow committed
183 184
			if unsentKeys == 0 {
				providers = bs.routing.FindProvidersAsync(ctx, k, maxProvidersPerRequest)
Jeromy's avatar
Jeromy committed
185
			}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
186
			unsentKeys++
Jeromy's avatar
Jeromy committed
187

Brian Tiger Chow's avatar
Brian Tiger Chow committed
188 189 190
			if unsentKeys >= numKeysPerBatch {
				// send wantlist to providers
				err := bs.sendWantListTo(ctx, providers)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
191 192 193
				if err != nil {
					log.Errorf("error sending wantlist: %s", err)
				}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
194
				unsentKeys = 0
Brian Tiger Chow's avatar
Brian Tiger Chow committed
195
				broadcastSignal = time.After(rebroadcastPeriod)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
196
				providers = nil
Jeromy's avatar
Jeromy committed
197 198 199
			} else {
				// set a timeout to wait for more blocks or send current wantlist

Brian Tiger Chow's avatar
Brian Tiger Chow committed
200
				broadcastSignal = time.After(batchDelay)
Jeromy's avatar
Jeromy committed
201 202 203 204 205 206 207
			}
		case <-ctx.Done():
			return
		}
	}
}

208 209
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
210
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
211
	log.Debugf("Has Block %v", blk.Key())
212
	bs.wantlist.Remove(blk.Key())
213
	bs.sendToPeersThatWant(ctx, blk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214
	return bs.routing.Provide(ctx, blk.Key())
215 216
}

217
// TODO(brian): handle errors
218 219
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
	peer.Peer, bsmsg.BitSwapMessage) {
220 221
	log.Debugf("ReceiveMessage from %v", p.Key())
	log.Debugf("Message wantlist: %v", incoming.Wantlist())
222

223
	if p == nil {
224
		log.Error("Received message from nil peer!")
225 226
		// TODO propagate the error upward
		return nil, nil
227 228
	}
	if incoming == nil {
229
		log.Error("Got nil bitswap message!")
230 231
		// TODO propagate the error upward
		return nil, nil
232
	}
233

234 235 236
	// Record message bytes in ledger
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
237
	bs.strategy.MessageReceived(p, incoming) // FIRST
238

239
	for _, block := range incoming.Blocks() {
240
		// TODO verify blocks?
241
		if err := bs.blockstore.Put(&block); err != nil {
242
			continue // FIXME(brian): err ignored
243
		}
244 245 246 247 248
		bs.notifications.Publish(block)
		err := bs.HasBlock(ctx, block)
		if err != nil {
			log.Warningf("HasBlock errored: %s", err)
		}
249 250
	}

251 252
	message := bsmsg.New()
	for _, wanted := range bs.wantlist.Keys() {
253
		message.AddWanted(wanted)
254
	}
255
	for _, key := range incoming.Wantlist() {
256 257
		// TODO: might be better to check if we have the block before checking
		//			if we should send it to someone
258
		if bs.strategy.ShouldSendBlockToPeer(key, p) {
259 260 261
			if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
				continue
			} else {
262
				message.AddBlock(*block)
263
			}
264 265
		}
	}
266

Jeromy's avatar
Jeromy committed
267
	bs.strategy.MessageSent(p, message)
268
	log.Debug("Returning message.")
269 270 271 272
	return p, message
}

func (bs *bitswap) ReceiveError(err error) {
273
	log.Errorf("Bitswap ReceiveError: %s", err)
274 275
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
276
}
277

278 279
// send strives to ensure that accounting is always performed when a message is
// sent
280
func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
281
	bs.sender.SendMessage(ctx, p, m)
282
	bs.strategy.MessageSent(p, m)
283 284
}

285
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
286
	log.Debugf("Sending %v to peers that want it", block.Key())
287

288 289
	for _, p := range bs.strategy.Peers() {
		if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
290
			log.Debugf("%v wants %v", p, block.Key())
291 292
			if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
				message := bsmsg.New()
293
				message.AddBlock(block)
294
				for _, wanted := range bs.wantlist.Keys() {
295
					message.AddWanted(wanted)
296
				}
297
				bs.send(ctx, p, message)
298 299 300 301
			}
		}
	}
}