bitswap.go 8.46 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2
// package bitswap implements the IPFS Exchange interface with the BitSwap
// bilateral exchange protocol.
3 4 5
package bitswap

import (
Jeromy's avatar
Jeromy committed
6 7
	"time"

8
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
9
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
10 11 12 13 14 15 16 17 18 19

	blocks "github.com/jbenet/go-ipfs/blocks"
	blockstore "github.com/jbenet/go-ipfs/blockstore"
	exchange "github.com/jbenet/go-ipfs/exchange"
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
	strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
20
	"github.com/jbenet/go-ipfs/util/eventlog"
21 22
)

23
var log = eventlog.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24

25 26 27
// New initializes a BitSwap instance that communicates over the
// provided BitSwapNetwork. This function registers the returned instance as
// the network delegate.
28
// Runs until context is cancelled
29 30
func New(ctx context.Context, p peer.Peer,
	network bsnet.BitSwapNetwork, routing bsnet.Routing,
31
	d ds.ThreadSafeDatastore, nice bool) exchange.Interface {
32

33 34
	notif := notifications.New()
	go func() {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
35 36
		<-ctx.Done()
		notif.Shutdown()
37 38
	}()

39 40
	bs := &bitswap{
		blockstore:    blockstore.NewBlockstore(d),
41
		notifications: notif,
42
		strategy:      strategy.New(nice),
43
		routing:       routing,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
44
		sender:        network,
45
		wantlist:      u.NewKeySet(),
Brian Tiger Chow's avatar
naming  
Brian Tiger Chow committed
46
		blockRequests: make(chan u.Key, 32),
47
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
48
	network.SetDelegate(bs)
Jeromy's avatar
Jeromy committed
49
	go bs.run(ctx)
50 51 52 53

	return bs
}

54 55 56 57
// bitswap instances implement the bitswap protocol.
type bitswap struct {

	// sender delivers messages on behalf of the session
58
	sender bsnet.BitSwapNetwork
59 60 61 62 63 64

	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

	// routing interface for communication
65
	routing bsnet.Routing
66 67 68

	notifications notifications.PubSub

Brian Tiger Chow's avatar
naming  
Brian Tiger Chow committed
69
	blockRequests chan u.Key
Jeromy's avatar
Jeromy committed
70

71 72 73 74
	// strategy listens to network traffic and makes decisions about how to
	// interact with partners.
	// TODO(brian): save the strategy's state to the datastore
	strategy strategy.Strategy
75

76
	wantlist u.KeySet
77 78
}

79 80
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context
81 82
//
// TODO ensure only one active request per key
Jeromy's avatar
Jeromy committed
83
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
84 85 86 87

	// make sure to derive a new |ctx| and pass it to children. It's correct to
	// listen on |parent| here, but incorrect to pass |parent| to new async
	// functions. This is difficult to enforce. May this comment keep you safe.
88

89
	ctx, cancelFunc := context.WithCancel(parent)
90 91
	defer cancelFunc()

92 93
	ctx = eventlog.ContextWithMetadata(ctx, eventlog.Uuid("GetBlockRequest"))
	log.Event(ctx, "GetBlockRequestBegin", &k)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
94
	defer log.Event(ctx, "GetBlockRequestEnd", &k)
95

96
	bs.wantlist.Add(k)
97
	promise := bs.notifications.Subscribe(ctx, k)
98

Jeromy's avatar
Jeromy committed
99
	select {
Brian Tiger Chow's avatar
naming  
Brian Tiger Chow committed
100
	case bs.blockRequests <- k:
Jeromy's avatar
Jeromy committed
101 102 103
	case <-parent.Done():
		return nil, parent.Err()
	}
104 105

	select {
106
	case block := <-promise:
107
		bs.wantlist.Remove(k)
108
		return &block, nil
109 110
	case <-parent.Done():
		return nil, parent.Err()
111 112 113
	}
}

Jeromy's avatar
Jeromy committed
114 115 116 117 118 119 120 121 122 123 124
func (bs *bitswap) GetBlocks(parent context.Context, ks []u.Key) (*blocks.Block, error) {
	// TODO: something smart
	return nil, nil
}

func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
	message := bsmsg.New()
	for _, wanted := range bs.wantlist.Keys() {
		message.AddWanted(wanted)
	}
	for peerToQuery := range peers {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
125
		log.Event(ctx, "PeerToQuery", peerToQuery)
Jeromy's avatar
Jeromy committed
126 127
		go func(p peer.Peer) {

Brian Tiger Chow's avatar
Brian Tiger Chow committed
128
			log.Event(ctx, "DialPeer", p)
Jeromy's avatar
Jeromy committed
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
			err := bs.sender.DialPeer(ctx, p)
			if err != nil {
				log.Errorf("Error sender.DialPeer(%s)", p)
				return
			}

			response, err := bs.sender.SendRequest(ctx, p, message)
			if err != nil {
				log.Errorf("Error sender.SendRequest(%s) = %s", p, err)
				return
			}
			// FIXME ensure accounting is handled correctly when
			// communication fails. May require slightly different API to
			// get better guarantees. May need shared sequence numbers.
			bs.strategy.MessageSent(p, message)

			if response == nil {
				return
			}
			bs.ReceiveMessage(ctx, p, response)
		}(peerToQuery)
	}
	return nil
}

func (bs *bitswap) run(ctx context.Context) {

Brian Tiger Chow's avatar
Brian Tiger Chow committed
156 157 158
	const rebroadcastPeriod = time.Second * 5 // Every so often, we should resend out our current want list
	const batchDelay = time.Millisecond * 3   // Time to wait before sending out wantlists to better batch up requests
	const peersPerSend = 6
159
	const threshold = 10
Brian Tiger Chow's avatar
Brian Tiger Chow committed
160 161 162

	var sendlist <-chan peer.Peer // NB: must be initialized to zero value
	broadcastSignal := time.After(rebroadcastPeriod)
Jeromy's avatar
Jeromy committed
163
	unsent := 0
Brian Tiger Chow's avatar
Brian Tiger Chow committed
164

Jeromy's avatar
Jeromy committed
165 166
	for {
		select {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
167
		case <-broadcastSignal:
Jeromy's avatar
Jeromy committed
168 169 170 171
			wantlist := bs.wantlist.Keys()
			if len(wantlist) == 0 {
				continue
			}
Jeromy's avatar
Jeromy committed
172 173
			if sendlist == nil {
				// rely on semi randomness of maps
Jeromy's avatar
Jeromy committed
174
				firstKey := wantlist[0]
Jeromy's avatar
Jeromy committed
175 176 177 178
				sendlist = bs.routing.FindProvidersAsync(ctx, firstKey, 6)
			}
			err := bs.sendWantListTo(ctx, sendlist)
			if err != nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
179
				log.Errorf("error sending wantlist: %s", err)
Jeromy's avatar
Jeromy committed
180 181
			}
			sendlist = nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
182
			broadcastSignal = time.After(rebroadcastPeriod)
Brian Tiger Chow's avatar
naming  
Brian Tiger Chow committed
183
		case k := <-bs.blockRequests:
Jeromy's avatar
Jeromy committed
184 185 186 187 188 189 190
			if unsent == 0 {
				sendlist = bs.routing.FindProvidersAsync(ctx, k, peersPerSend)
			}
			unsent++

			if unsent >= threshold {
				// send wantlist to sendlist
Brian Tiger Chow's avatar
Brian Tiger Chow committed
191 192 193 194
				err := bs.sendWantListTo(ctx, sendlist)
				if err != nil {
					log.Errorf("error sending wantlist: %s", err)
				}
Jeromy's avatar
Jeromy committed
195
				unsent = 0
Brian Tiger Chow's avatar
Brian Tiger Chow committed
196
				broadcastSignal = time.After(rebroadcastPeriod)
Jeromy's avatar
Jeromy committed
197 198 199 200
				sendlist = nil
			} else {
				// set a timeout to wait for more blocks or send current wantlist

Brian Tiger Chow's avatar
Brian Tiger Chow committed
201
				broadcastSignal = time.After(batchDelay)
Jeromy's avatar
Jeromy committed
202 203 204 205 206 207 208
			}
		case <-ctx.Done():
			return
		}
	}
}

209 210
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
211
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
212
	log.Debugf("Has Block %v", blk.Key())
213
	bs.wantlist.Remove(blk.Key())
214
	bs.sendToPeersThatWant(ctx, blk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
215
	return bs.routing.Provide(ctx, blk.Key())
216 217 218
}

// TODO(brian): handle errors
219 220
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
	peer.Peer, bsmsg.BitSwapMessage) {
221 222
	log.Debugf("ReceiveMessage from %v", p.Key())
	log.Debugf("Message wantlist: %v", incoming.Wantlist())
223

224
	if p == nil {
225
		log.Error("Received message from nil peer!")
226 227
		// TODO propagate the error upward
		return nil, nil
228 229
	}
	if incoming == nil {
230
		log.Error("Got nil bitswap message!")
231 232
		// TODO propagate the error upward
		return nil, nil
233
	}
234

235 236 237
	// Record message bytes in ledger
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
238
	bs.strategy.MessageReceived(p, incoming) // FIRST
239

240
	for _, block := range incoming.Blocks() {
241
		// TODO verify blocks?
242
		if err := bs.blockstore.Put(&block); err != nil {
243
			continue // FIXME(brian): err ignored
244
		}
245 246 247 248 249
		bs.notifications.Publish(block)
		err := bs.HasBlock(ctx, block)
		if err != nil {
			log.Warningf("HasBlock errored: %s", err)
		}
250 251
	}

252 253
	message := bsmsg.New()
	for _, wanted := range bs.wantlist.Keys() {
254
		message.AddWanted(wanted)
255
	}
256
	for _, key := range incoming.Wantlist() {
257 258
		// TODO: might be better to check if we have the block before checking
		//			if we should send it to someone
259
		if bs.strategy.ShouldSendBlockToPeer(key, p) {
260 261 262
			if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
				continue
			} else {
263
				message.AddBlock(*block)
264 265 266
			}
		}
	}
267

Jeromy's avatar
Jeromy committed
268
	bs.strategy.MessageSent(p, message)
269
	log.Debug("Returning message.")
270 271 272 273
	return p, message
}

func (bs *bitswap) ReceiveError(err error) {
274
	log.Errorf("Bitswap ReceiveError: %s", err)
275 276
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
277 278
}

279 280
// send strives to ensure that accounting is always performed when a message is
// sent
281
func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
282
	bs.sender.SendMessage(ctx, p, m)
283
	bs.strategy.MessageSent(p, m)
284 285
}

286
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
287
	log.Debugf("Sending %v to peers that want it", block.Key())
288

289 290
	for _, p := range bs.strategy.Peers() {
		if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
291
			log.Debugf("%v wants %v", p, block.Key())
292 293
			if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
				message := bsmsg.New()
294
				message.AddBlock(block)
295
				for _, wanted := range bs.wantlist.Keys() {
296
					message.AddWanted(wanted)
297
				}
298
				bs.send(ctx, p, message)
299 300 301 302
			}
		}
	}
}