bitswap.go 5.98 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package bitswap

import (
4
	"errors"
5
	"sync"
6

7
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
8 9
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	blocks "github.com/jbenet/go-ipfs/blocks"
11
	blockstore "github.com/jbenet/go-ipfs/blockstore"
12 13 14 15 16
	exchange "github.com/jbenet/go-ipfs/exchange"
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
	strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
17
	peer "github.com/jbenet/go-ipfs/peer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20
)

21 22
// NetMessageSession initializes a BitSwap session that communicates over the
// provided NetMessage service
23
func NetMessageSession(parent context.Context, p *peer.Peer, s bsnet.NetMessageService, directory bsnet.Routing, d ds.Datastore) exchange.Interface {
24

25
	networkAdapter := bsnet.NetMessageAdapter(s, nil)
26 27 28 29 30
	bs := &bitswap{
		blockstore:    blockstore.NewBlockstore(d),
		notifications: notifications.New(),
		strategy:      strategy.New(),
		routing:       directory,
31
		sender:        networkAdapter,
32 33 34
		wantlist: WantList{
			data: make(map[u.Key]struct{}),
		},
35
	}
36
	networkAdapter.SetDelegate(bs)
37 38 39 40

	return bs
}

41 42
// bitswap instances implement the bitswap protocol.
type bitswap struct {
43

44
	// sender delivers messages on behalf of the session
45
	sender bsnet.Adapter
46

47
	// blockstore is the local database
48
	// NB: ensure threadsafety
49
	blockstore blockstore.Blockstore
50

51
	// routing interface for communication
52
	routing bsnet.Routing
53

54
	notifications notifications.PubSub
55

56
	// strategy listens to network traffic and makes decisions about how to
57
	// interact with partners.
58 59
	// TODO(brian): save the strategy's state to the datastore
	strategy strategy.Strategy
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92

	wantlist WantList
}

type WantList struct {
	lock sync.RWMutex
	data map[u.Key]struct{}
}

func (wl *WantList) Add(k u.Key) {
	u.DOut("Adding %v to Wantlist\n", k.Pretty())
	wl.lock.Lock()
	defer wl.lock.Unlock()

	wl.data[k] = struct{}{}
}

func (wl *WantList) Remove(k u.Key) {
	u.DOut("Removing %v from Wantlist\n", k.Pretty())
	wl.lock.Lock()
	defer wl.lock.Unlock()

	delete(wl.data, k)
}

func (wl *WantList) Keys() []u.Key {
	wl.lock.RLock()
	defer wl.lock.RUnlock()
	keys := make([]u.Key, 0)
	for k, _ := range wl.data {
		keys = append(keys, k)
	}
	return keys
93 94
}

95 96
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context
97 98
//
// TODO ensure only one active request per key
99
func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) {
100
	u.DOut("Get Block %v\n", k.Pretty())
101

102
	ctx, cancelFunc := context.WithCancel(parent)
103
	bs.wantlist.Add(k)
104
	promise := bs.notifications.Subscribe(ctx, k)
105

106 107
	const maxProviders = 20
	peersToQuery := bs.routing.FindProvidersAsync(ctx, k, maxProviders)
108

109
	go func() {
110
		message := bsmsg.New()
111 112 113
		for _, wanted := range bs.wantlist.Keys() {
			message.AppendWanted(wanted)
		}
114
		message.AppendWanted(k)
115
		for iiiii := range peersToQuery {
116
			// u.DOut("bitswap got peersToQuery: %s\n", iiiii)
117
			go func(p *peer.Peer) {
118
				response, err := bs.sender.SendRequest(ctx, p, message)
119 120 121
				if err != nil {
					return
				}
122 123 124 125 126
				// FIXME ensure accounting is handled correctly when
				// communication fails. May require slightly different API to
				// get better guarantees. May need shared sequence numbers.
				bs.strategy.MessageSent(p, message)

127 128 129
				if response == nil {
					return
				}
130
				bs.ReceiveMessage(ctx, p, response)
131
			}(iiiii)
132 133
		}
	}()
134 135

	select {
136 137
	case block := <-promise:
		cancelFunc()
138
		bs.wantlist.Remove(k)
139
		// TODO remove from wantlist
140
		return &block, nil
141 142
	case <-parent.Done():
		return nil, parent.Err()
143 144 145
	}
}

146
// HasBlock announces the existance of a block to bitswap, potentially sending
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
147
// it to peers (Partners) whose WantLists include it.
148
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
149 150
	u.DOut("Has Block %v\n", blk.Key().Pretty())
	bs.wantlist.Remove(blk.Key())
151
	bs.sendToPeersThatWant(ctx, blk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152
	return bs.routing.Provide(ctx, blk.Key())
153 154
}

155
// TODO(brian): handle errors
156
func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
157
	*peer.Peer, bsmsg.BitSwapMessage, error) {
158
	u.DOut("ReceiveMessage from %v\n", p.Key().Pretty())
159

160 161 162 163 164 165
	if p == nil {
		return nil, nil, errors.New("Received nil Peer")
	}
	if incoming == nil {
		return nil, nil, errors.New("Received nil Message")
	}
166

167
	bs.strategy.MessageReceived(p, incoming) // FIRST
168

169
	for _, block := range incoming.Blocks() {
170 171 172
		// TODO verify blocks?
		if err := bs.blockstore.Put(block); err != nil {
			continue // FIXME(brian): err ignored
173
		}
174
		go bs.notifications.Publish(block)
175
		go func(block blocks.Block) {
176
			_ = bs.HasBlock(ctx, block) // FIXME err ignored
177
		}(block)
178 179
	}

180 181 182 183
	message := bsmsg.New()
	for _, wanted := range bs.wantlist.Keys() {
		message.AppendWanted(wanted)
	}
184 185
	for _, key := range incoming.Wantlist() {
		if bs.strategy.ShouldSendBlockToPeer(key, p) {
186 187 188 189
			if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
				continue
			} else {
				message.AppendBlock(*block)
190
			}
191 192
		}
	}
193 194
	defer bs.strategy.MessageSent(p, message)
	return p, message, nil
195
}
196

197 198 199
// send strives to ensure that accounting is always performed when a message is
// sent
func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
200
	bs.sender.SendMessage(ctx, p, m)
201
	go bs.strategy.MessageSent(p, m)
202 203
}

204
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
205
	u.DOut("Sending %v to peers that want it\n", block.Key().Pretty())
206 207
	for _, p := range bs.strategy.Peers() {
		if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
208
			u.DOut("%v wants %v\n", p.Key().Pretty(), block.Key().Pretty())
209 210 211
			if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
				message := bsmsg.New()
				message.AppendBlock(block)
212 213 214
				for _, wanted := range bs.wantlist.Keys() {
					message.AppendWanted(wanted)
				}
215
				go bs.send(ctx, p, message)
216 217 218 219
			}
		}
	}
}