bitswap.go 7.92 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2
// package bitswap implements the IPFS Exchange interface with the BitSwap
// bilateral exchange protocol.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4 5
package bitswap

import (
Jeromy's avatar
Jeromy committed
6 7
	"time"

8
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
9
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11
	blocks "github.com/jbenet/go-ipfs/blocks"
12
	blockstore "github.com/jbenet/go-ipfs/blockstore"
13 14 15 16 17
	exchange "github.com/jbenet/go-ipfs/exchange"
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
	strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
18
	peer "github.com/jbenet/go-ipfs/peer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20 21
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22 23
var log = u.Logger("bitswap")

24 25 26
// New initializes a BitSwap instance that communicates over the
// provided BitSwapNetwork. This function registers the returned instance as
// the network delegate.
27
// Runs until context is cancelled
28 29
func New(ctx context.Context, p peer.Peer,
	network bsnet.BitSwapNetwork, routing bsnet.Routing,
30
	d ds.ThreadSafeDatastore, nice bool) exchange.Interface {
31

32 33
	notif := notifications.New()
	go func() {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
34 35
		<-ctx.Done()
		notif.Shutdown()
36 37
	}()

38 39
	bs := &bitswap{
		blockstore:    blockstore.NewBlockstore(d),
40
		notifications: notif,
41
		strategy:      strategy.New(nice),
42
		routing:       routing,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
43
		sender:        network,
44
		wantlist:      u.NewKeySet(),
Jeromy's avatar
Jeromy committed
45
		blockReq:      make(chan u.Key, 32),
46
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
47
	network.SetDelegate(bs)
Jeromy's avatar
Jeromy committed
48
	go bs.run(ctx)
49 50 51 52

	return bs
}

53 54
// bitswap instances implement the bitswap protocol.
type bitswap struct {
55

56
	// sender delivers messages on behalf of the session
57
	sender bsnet.BitSwapNetwork
58

59
	// blockstore is the local database
60
	// NB: ensure threadsafety
61
	blockstore blockstore.Blockstore
62

63
	// routing interface for communication
64
	routing bsnet.Routing
65

66
	notifications notifications.PubSub
67

Jeromy's avatar
Jeromy committed
68 69
	blockReq chan u.Key

70
	// strategy listens to network traffic and makes decisions about how to
71
	// interact with partners.
72 73
	// TODO(brian): save the strategy's state to the datastore
	strategy strategy.Strategy
74

75
	wantlist u.KeySet
76 77
}

78 79
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context
80 81
//
// TODO ensure only one active request per key
Jeromy's avatar
Jeromy committed
82
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
83
	log.Debugf("Get Block %v", k)
Jeromy's avatar
Jeromy committed
84 85
	now := time.Now()
	defer func() {
86
		log.Debugf("GetBlock took %f secs", time.Now().Sub(now).Seconds())
Jeromy's avatar
Jeromy committed
87
	}()
88

89
	ctx, cancelFunc := context.WithCancel(parent)
90 91
	defer cancelFunc()

92
	bs.wantlist.Add(k)
93
	promise := bs.notifications.Subscribe(ctx, k)
94

Jeromy's avatar
Jeromy committed
95 96 97 98 99
	select {
	case bs.blockReq <- k:
	case <-parent.Done():
		return nil, parent.Err()
	}
100 101

	select {
102
	case block := <-promise:
103
		bs.wantlist.Remove(k)
104
		return &block, nil
105 106
	case <-parent.Done():
		return nil, parent.Err()
107 108 109
	}
}

Jeromy's avatar
Jeromy committed
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
func (bs *bitswap) GetBlocks(parent context.Context, ks []u.Key) (*blocks.Block, error) {
	// TODO: something smart
	return nil, nil
}

func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
	message := bsmsg.New()
	for _, wanted := range bs.wantlist.Keys() {
		message.AddWanted(wanted)
	}
	for peerToQuery := range peers {
		log.Debugf("bitswap got peersToQuery: %s", peerToQuery)
		go func(p peer.Peer) {

			log.Debugf("bitswap dialing peer: %s", p)
			err := bs.sender.DialPeer(ctx, p)
			if err != nil {
				log.Errorf("Error sender.DialPeer(%s)", p)
				return
			}

			response, err := bs.sender.SendRequest(ctx, p, message)
			if err != nil {
				log.Errorf("Error sender.SendRequest(%s) = %s", p, err)
				return
			}
			// FIXME ensure accounting is handled correctly when
			// communication fails. May require slightly different API to
			// get better guarantees. May need shared sequence numbers.
			bs.strategy.MessageSent(p, message)

			if response == nil {
				return
			}
			bs.ReceiveMessage(ctx, p, response)
		}(peerToQuery)
	}
	return nil
}

func (bs *bitswap) run(ctx context.Context) {
	var sendlist <-chan peer.Peer

	// Every so often, we should resend out our current want list
	rebroadcastTime := time.Second * 5

	// Time to wait before sending out wantlists to better batch up requests
	bufferTime := time.Millisecond * 3
	peersPerSend := 6

	timeout := time.After(rebroadcastTime)
	threshold := 10
	unsent := 0
	for {
		select {
		case <-timeout:
			if sendlist == nil {
				// rely on semi randomness of maps
				firstKey := bs.wantlist.Keys()[0]
				sendlist = bs.routing.FindProvidersAsync(ctx, firstKey, 6)
			}
			err := bs.sendWantListTo(ctx, sendlist)
			if err != nil {
				log.Error("error sending wantlist: %s", err)
			}
			sendlist = nil
			timeout = time.After(rebroadcastTime)
		case k := <-bs.blockReq:
			if unsent == 0 {
				sendlist = bs.routing.FindProvidersAsync(ctx, k, peersPerSend)
			}
			unsent++

			if unsent >= threshold {
				// send wantlist to sendlist
				bs.sendWantListTo(ctx, sendlist)
				unsent = 0
				timeout = time.After(rebroadcastTime)
				sendlist = nil
			} else {
				// set a timeout to wait for more blocks or send current wantlist

				timeout = time.After(bufferTime)
			}
		case <-ctx.Done():
			return
		}
	}
}

200 201
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
202
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
203
	log.Debugf("Has Block %v", blk.Key())
204
	bs.wantlist.Remove(blk.Key())
205
	bs.sendToPeersThatWant(ctx, blk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
206
	return bs.routing.Provide(ctx, blk.Key())
207 208
}

209
// TODO(brian): handle errors
210 211
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
	peer.Peer, bsmsg.BitSwapMessage) {
212 213
	log.Debugf("ReceiveMessage from %v", p.Key())
	log.Debugf("Message wantlist: %v", incoming.Wantlist())
214

215
	if p == nil {
216
		log.Error("Received message from nil peer!")
217 218
		// TODO propagate the error upward
		return nil, nil
219 220
	}
	if incoming == nil {
221
		log.Error("Got nil bitswap message!")
222 223
		// TODO propagate the error upward
		return nil, nil
224
	}
225

226 227 228
	// Record message bytes in ledger
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
229
	bs.strategy.MessageReceived(p, incoming) // FIRST
230

231
	for _, block := range incoming.Blocks() {
232
		// TODO verify blocks?
233
		if err := bs.blockstore.Put(&block); err != nil {
234
			continue // FIXME(brian): err ignored
235
		}
236 237 238 239 240
		bs.notifications.Publish(block)
		err := bs.HasBlock(ctx, block)
		if err != nil {
			log.Warningf("HasBlock errored: %s", err)
		}
241 242
	}

243 244
	message := bsmsg.New()
	for _, wanted := range bs.wantlist.Keys() {
245
		message.AddWanted(wanted)
246
	}
247
	for _, key := range incoming.Wantlist() {
248 249
		// TODO: might be better to check if we have the block before checking
		//			if we should send it to someone
250
		if bs.strategy.ShouldSendBlockToPeer(key, p) {
251 252 253
			if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
				continue
			} else {
254
				message.AddBlock(*block)
255
			}
256 257
		}
	}
258

Jeromy's avatar
Jeromy committed
259
	bs.strategy.MessageSent(p, message)
260
	log.Debug("Returning message.")
261 262 263 264
	return p, message
}

func (bs *bitswap) ReceiveError(err error) {
265
	log.Errorf("Bitswap ReceiveError: %s", err)
266 267
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
268
}
269

270 271
// send strives to ensure that accounting is always performed when a message is
// sent
272
func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
273
	bs.sender.SendMessage(ctx, p, m)
274
	bs.strategy.MessageSent(p, m)
275 276
}

277
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
278
	log.Debugf("Sending %v to peers that want it", block.Key())
279

280 281
	for _, p := range bs.strategy.Peers() {
		if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
282
			log.Debugf("%v wants %v", p, block.Key())
283 284
			if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
				message := bsmsg.New()
285
				message.AddBlock(block)
286
				for _, wanted := range bs.wantlist.Keys() {
287
					message.AddWanted(wanted)
288
				}
289
				bs.send(ctx, p, message)
290 291 292 293
			}
		}
	}
}