bitswap.go 8.36 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2
// package bitswap implements the IPFS Exchange interface with the BitSwap
// bilateral exchange protocol.
3 4 5
package bitswap

import (
Jeromy's avatar
Jeromy committed
6 7
	"time"

8
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
9
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
10 11 12 13 14 15 16 17 18 19

	blocks "github.com/jbenet/go-ipfs/blocks"
	blockstore "github.com/jbenet/go-ipfs/blockstore"
	exchange "github.com/jbenet/go-ipfs/exchange"
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
	strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
20
	"github.com/jbenet/go-ipfs/util/eventlog"
21 22
)

23
var log = eventlog.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24

25 26 27
// New initializes a BitSwap instance that communicates over the
// provided BitSwapNetwork. This function registers the returned instance as
// the network delegate.
28
// Runs until context is cancelled
29 30
func New(ctx context.Context, p peer.Peer,
	network bsnet.BitSwapNetwork, routing bsnet.Routing,
31
	d ds.ThreadSafeDatastore, nice bool) exchange.Interface {
32

33 34
	notif := notifications.New()
	go func() {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
35 36
		<-ctx.Done()
		notif.Shutdown()
37 38
	}()

39 40
	bs := &bitswap{
		blockstore:    blockstore.NewBlockstore(d),
41
		notifications: notif,
42
		strategy:      strategy.New(nice),
43
		routing:       routing,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
44
		sender:        network,
45
		wantlist:      u.NewKeySet(),
Jeromy's avatar
Jeromy committed
46
		blockReq:      make(chan u.Key, 32),
47
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
48
	network.SetDelegate(bs)
Jeromy's avatar
Jeromy committed
49
	go bs.run(ctx)
50 51 52 53

	return bs
}

54 55 56 57
// bitswap instances implement the bitswap protocol.
type bitswap struct {

	// sender delivers messages on behalf of the session
58
	sender bsnet.BitSwapNetwork
59 60 61 62 63 64

	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

	// routing interface for communication
65
	routing bsnet.Routing
66 67 68

	notifications notifications.PubSub

Jeromy's avatar
Jeromy committed
69 70
	blockReq chan u.Key

71 72 73 74
	// strategy listens to network traffic and makes decisions about how to
	// interact with partners.
	// TODO(brian): save the strategy's state to the datastore
	strategy strategy.Strategy
75

76
	wantlist u.KeySet
77 78
}

79 80
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context
81 82
//
// TODO ensure only one active request per key
Jeromy's avatar
Jeromy committed
83
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
84 85 86 87

	// make sure to derive a new |ctx| and pass it to children. It's correct to
	// listen on |parent| here, but incorrect to pass |parent| to new async
	// functions. This is difficult to enforce. May this comment keep you safe.
88

89
	ctx, cancelFunc := context.WithCancel(parent)
90 91
	defer cancelFunc()

92 93
	ctx = eventlog.ContextWithMetadata(ctx, eventlog.Uuid("GetBlockRequest"))
	log.Event(ctx, "GetBlockRequestBegin", &k)
94 95

	defer func() {
96
		log.Event(ctx, "GetBlockRequestEnd", &k)
97 98
	}()

99
	bs.wantlist.Add(k)
100
	promise := bs.notifications.Subscribe(ctx, k)
101

Jeromy's avatar
Jeromy committed
102 103 104 105 106
	select {
	case bs.blockReq <- k:
	case <-parent.Done():
		return nil, parent.Err()
	}
107 108

	select {
109
	case block := <-promise:
110
		bs.wantlist.Remove(k)
111
		return &block, nil
112 113
	case <-parent.Done():
		return nil, parent.Err()
114 115 116
	}
}

Jeromy's avatar
Jeromy committed
117 118 119 120 121 122 123 124 125 126 127
func (bs *bitswap) GetBlocks(parent context.Context, ks []u.Key) (*blocks.Block, error) {
	// TODO: something smart
	return nil, nil
}

func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
	message := bsmsg.New()
	for _, wanted := range bs.wantlist.Keys() {
		message.AddWanted(wanted)
	}
	for peerToQuery := range peers {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
128
		log.Event(ctx, "PeerToQuery", peerToQuery)
Jeromy's avatar
Jeromy committed
129 130
		go func(p peer.Peer) {

Brian Tiger Chow's avatar
Brian Tiger Chow committed
131
			log.Event(ctx, "DialPeer", p)
Jeromy's avatar
Jeromy committed
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
			err := bs.sender.DialPeer(ctx, p)
			if err != nil {
				log.Errorf("Error sender.DialPeer(%s)", p)
				return
			}

			response, err := bs.sender.SendRequest(ctx, p, message)
			if err != nil {
				log.Errorf("Error sender.SendRequest(%s) = %s", p, err)
				return
			}
			// FIXME ensure accounting is handled correctly when
			// communication fails. May require slightly different API to
			// get better guarantees. May need shared sequence numbers.
			bs.strategy.MessageSent(p, message)

			if response == nil {
				return
			}
			bs.ReceiveMessage(ctx, p, response)
		}(peerToQuery)
	}
	return nil
}

func (bs *bitswap) run(ctx context.Context) {
	var sendlist <-chan peer.Peer

	// Every so often, we should resend out our current want list
	rebroadcastTime := time.Second * 5

	// Time to wait before sending out wantlists to better batch up requests
	bufferTime := time.Millisecond * 3
	peersPerSend := 6

	timeout := time.After(rebroadcastTime)
	threshold := 10
	unsent := 0
	for {
		select {
		case <-timeout:
Jeromy's avatar
Jeromy committed
173 174 175 176
			wantlist := bs.wantlist.Keys()
			if len(wantlist) == 0 {
				continue
			}
Jeromy's avatar
Jeromy committed
177 178
			if sendlist == nil {
				// rely on semi randomness of maps
Jeromy's avatar
Jeromy committed
179
				firstKey := wantlist[0]
Jeromy's avatar
Jeromy committed
180 181 182 183
				sendlist = bs.routing.FindProvidersAsync(ctx, firstKey, 6)
			}
			err := bs.sendWantListTo(ctx, sendlist)
			if err != nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
184
				log.Errorf("error sending wantlist: %s", err)
Jeromy's avatar
Jeromy committed
185 186 187 188 189 190 191 192 193 194 195
			}
			sendlist = nil
			timeout = time.After(rebroadcastTime)
		case k := <-bs.blockReq:
			if unsent == 0 {
				sendlist = bs.routing.FindProvidersAsync(ctx, k, peersPerSend)
			}
			unsent++

			if unsent >= threshold {
				// send wantlist to sendlist
Brian Tiger Chow's avatar
Brian Tiger Chow committed
196 197 198 199
				err := bs.sendWantListTo(ctx, sendlist)
				if err != nil {
					log.Errorf("error sending wantlist: %s", err)
				}
Jeromy's avatar
Jeromy committed
200 201 202 203 204 205 206 207 208 209 210 211 212 213
				unsent = 0
				timeout = time.After(rebroadcastTime)
				sendlist = nil
			} else {
				// set a timeout to wait for more blocks or send current wantlist

				timeout = time.After(bufferTime)
			}
		case <-ctx.Done():
			return
		}
	}
}

214 215
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
216
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
217
	log.Debugf("Has Block %v", blk.Key())
218
	bs.wantlist.Remove(blk.Key())
219
	bs.sendToPeersThatWant(ctx, blk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220
	return bs.routing.Provide(ctx, blk.Key())
221 222 223
}

// TODO(brian): handle errors
224 225
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
	peer.Peer, bsmsg.BitSwapMessage) {
226 227
	log.Debugf("ReceiveMessage from %v", p.Key())
	log.Debugf("Message wantlist: %v", incoming.Wantlist())
228

229
	if p == nil {
230
		log.Error("Received message from nil peer!")
231 232
		// TODO propagate the error upward
		return nil, nil
233 234
	}
	if incoming == nil {
235
		log.Error("Got nil bitswap message!")
236 237
		// TODO propagate the error upward
		return nil, nil
238
	}
239

240 241 242
	// Record message bytes in ledger
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
243
	bs.strategy.MessageReceived(p, incoming) // FIRST
244

245
	for _, block := range incoming.Blocks() {
246
		// TODO verify blocks?
247
		if err := bs.blockstore.Put(&block); err != nil {
248
			continue // FIXME(brian): err ignored
249
		}
250 251 252 253 254
		bs.notifications.Publish(block)
		err := bs.HasBlock(ctx, block)
		if err != nil {
			log.Warningf("HasBlock errored: %s", err)
		}
255 256
	}

257 258
	message := bsmsg.New()
	for _, wanted := range bs.wantlist.Keys() {
259
		message.AddWanted(wanted)
260
	}
261
	for _, key := range incoming.Wantlist() {
262 263
		// TODO: might be better to check if we have the block before checking
		//			if we should send it to someone
264
		if bs.strategy.ShouldSendBlockToPeer(key, p) {
265 266 267
			if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
				continue
			} else {
268
				message.AddBlock(*block)
269 270 271
			}
		}
	}
272

Jeromy's avatar
Jeromy committed
273
	bs.strategy.MessageSent(p, message)
274
	log.Debug("Returning message.")
275 276 277 278
	return p, message
}

func (bs *bitswap) ReceiveError(err error) {
279
	log.Errorf("Bitswap ReceiveError: %s", err)
280 281
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
282 283
}

284 285
// send strives to ensure that accounting is always performed when a message is
// sent
286
func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
287
	bs.sender.SendMessage(ctx, p, m)
288
	bs.strategy.MessageSent(p, m)
289 290
}

291
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
292
	log.Debugf("Sending %v to peers that want it", block.Key())
293

294 295
	for _, p := range bs.strategy.Peers() {
		if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
296
			log.Debugf("%v wants %v", p, block.Key())
297 298
			if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
				message := bsmsg.New()
299
				message.AddBlock(block)
300
				for _, wanted := range bs.wantlist.Keys() {
301
					message.AddWanted(wanted)
302
				}
303
				bs.send(ctx, p, message)
304 305 306 307
			}
		}
	}
}