bitswap.go 4.95 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package bitswap

import (
4
	"errors"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
5 6
	"time"

7
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
8 9
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"

10
	bsmsg "github.com/jbenet/go-ipfs/bitswap/message"
11
	bsnet "github.com/jbenet/go-ipfs/bitswap/network"
12
	notifications "github.com/jbenet/go-ipfs/bitswap/notifications"
13
	strategy "github.com/jbenet/go-ipfs/bitswap/strategy"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	blocks "github.com/jbenet/go-ipfs/blocks"
15
	blockstore "github.com/jbenet/go-ipfs/blockstore"
16
	peer "github.com/jbenet/go-ipfs/peer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
)

20 21
// TODO(brian): ensure messages are being received

22 23 24 25 26
// PartnerWantListMax is the bound for the number of keys we'll store per
// partner. These are usually taken from the top of the Partner's WantList
// advertisements. WantLists are sorted in terms of priority.
const PartnerWantListMax = 10

27 28
// bitswap instances implement the bitswap protocol.
type bitswap struct {
29
	// peer is the identity of this (local) node.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30
	peer *peer.Peer
31

32
	// sender delivers messages on behalf of the session
33
	sender bsnet.NetworkAdapter
34

35
	// blockstore is the local database
36
	// NB: ensure threadsafety
37
	blockstore blockstore.Blockstore
38

39
	// routing interface for communication
40
	routing Directory
41

42
	notifications notifications.PubSub
43

44
	// strategy listens to network traffic and makes decisions about how to
45
	// interact with partners.
46 47
	// TODO(brian): save the strategy's state to the datastore
	strategy strategy.Strategy
48 49
}

50
// NewSession initializes a bitswap session.
51
func NewSession(parent context.Context, s bsnet.NetworkService, p *peer.Peer, d ds.Datastore, directory Directory) Exchange {
52

53
	// FIXME(brian): instantiate a concrete Strategist
54
	receiver := bsnet.Forwarder{}
55
	bs := &bitswap{
56
		blockstore:    blockstore.NewBlockstore(d),
57
		notifications: notifications.New(),
58
		strategy:      strategy.New(),
59
		peer:          p,
60
		routing:       directory,
61
		sender:        bsnet.NewNetworkAdapter(s, &receiver),
62
	}
63
	receiver.Delegate(bs)
64 65

	return bs
66 67 68
}

// GetBlock attempts to retrieve a particular block from peers, within timeout.
69
func (bs *bitswap) Block(k u.Key, timeout time.Duration) (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
	*blocks.Block, error) {
71
	begin := time.Now()
72
	tleft := timeout - time.Now().Sub(begin)
73
	provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
74

75
	blockChannel := make(chan blocks.Block)
76
	after := time.After(tleft)
77

78
	// TODO: when the data is received, shut down this for loop ASAP
79 80 81 82 83 84 85 86
	go func() {
		for p := range provs_ch {
			go func(pr *peer.Peer) {
				blk, err := bs.getBlock(k, pr, tleft)
				if err != nil {
					return
				}
				select {
87
				case blockChannel <- *blk:
88 89 90 91 92
				default:
				}
			}(p)
		}
	}()
93 94

	select {
95 96
	case block := <-blockChannel:
		close(blockChannel)
97
		return &block, nil
98 99
	case <-after:
		return nil, u.ErrTimeout
100
	}
101 102
}

103
func (bs *bitswap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*blocks.Block, error) {
104

105 106 107
	ctx, _ := context.WithTimeout(context.Background(), timeout)
	blockChannel := bs.notifications.Subscribe(ctx, k)

108
	message := bsmsg.New()
109
	message.AppendWanted(k)
110 111 112 113

	// FIXME(brian): register the accountant on the service wrapper to ensure
	// that accounting is _always_ performed when SendMessage and
	// ReceiveMessage are called
114
	bs.sender.SendMessage(ctx, p, message)
115
	bs.strategy.MessageSent(p, message)
116

117 118
	block, ok := <-blockChannel
	if !ok {
119 120
		return nil, u.ErrTimeout
	}
121
	return &block, nil
122 123
}

124
func (bs *bitswap) sendToPeersThatWant(block blocks.Block) {
125 126 127
	for _, p := range bs.strategy.Peers() {
		if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
			if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
128 129 130 131 132 133
				go bs.send(p, block)
			}
		}
	}
}

134
// HasBlock announces the existance of a block to bitswap, potentially sending
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135
// it to peers (Partners) whose WantLists include it.
136
func (bs *bitswap) HasBlock(blk blocks.Block) error {
137
	go bs.sendToPeersThatWant(blk)
138 139 140
	return bs.routing.Provide(blk.Key())
}

141
// TODO(brian): get a return value
142
func (bs *bitswap) send(p *peer.Peer, b blocks.Block) {
143
	message := bsmsg.New()
144
	message.AppendBlock(b)
145
	// FIXME(brian): pass ctx
146
	bs.sender.SendMessage(context.Background(), p, message)
147
	bs.strategy.MessageSent(p, message)
148
}
149

150
// TODO(brian): handle errors
151
func (bs *bitswap) ReceiveMessage(
152
	ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
153
	*peer.Peer, bsmsg.BitSwapMessage, error) {
154

155
	bs.strategy.MessageReceived(sender, incoming)
156

157 158
	if incoming.Blocks() != nil {
		for _, block := range incoming.Blocks() {
159 160
			go bs.blockstore.Put(block) // FIXME(brian): err ignored
			go bs.notifications.Publish(block)
161 162 163 164
		}
	}

	if incoming.Wantlist() != nil {
165
		for _, key := range incoming.Wantlist() {
166
			if bs.strategy.ShouldSendBlockToPeer(key, sender) {
167 168 169 170 171 172 173
				block, errBlockNotFound := bs.blockstore.Get(key)
				if errBlockNotFound != nil {
					// TODO(brian): log/return the error
					continue
				}
				go bs.send(sender, *block)
			}
174 175
		}
	}
176 177
	return nil, nil, errors.New("TODO implement")
}
178 179 180 181

func numBytes(b blocks.Block) int {
	return len(b.Data)
}