bitswap.go 6.81 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package bitswap

import (
4
	"errors"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
5 6
	"time"

7
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
8 9
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"

10
	bsmsg "github.com/jbenet/go-ipfs/bitswap/message"
11
	notifications "github.com/jbenet/go-ipfs/bitswap/notifications"
12
	tx "github.com/jbenet/go-ipfs/bitswap/transmission"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13
	blocks "github.com/jbenet/go-ipfs/blocks"
14
	net "github.com/jbenet/go-ipfs/net"
15
	peer "github.com/jbenet/go-ipfs/peer"
16
	routing "github.com/jbenet/go-ipfs/routing"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
)

20 21
// TODO(brian): ensure messages are being received

22 23 24 25 26 27 28 29
// PartnerWantListMax is the bound for the number of keys we'll store per
// partner. These are usually taken from the top of the Partner's WantList
// advertisements. WantLists are sorted in terms of priority.
const PartnerWantListMax = 10

// KeySet is just a convenient alias for maps of keys, where we only care
// access/lookups.
type KeySet map[u.Key]struct{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31
// BitSwap instances implement the bitswap protocol.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32
type BitSwap struct {
33
	// peer is the identity of this (local) node.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34
	peer *peer.Peer
35

36 37
	// sender delivers messages on behalf of the session
	sender tx.Sender
38

39
	// datastore is the local database // Ledgers of known
40 41
	datastore ds.Datastore

42
	// routing interface for communication
43
	routing routing.IpfsRouting
44

45
	notifications notifications.PubSub
46

47 48 49 50
	// partners is a map of currently active bitswap relationships.
	// The Ledger has the peer.ID, and the peer connection works through net.
	// Ledgers of known relationships (active or inactive) stored in datastore.
	// Changes to the Ledger should be committed to the datastore.
51
	partners LedgerMap
52 53 54 55 56 57

	// haveList is the set of keys we have values for. a map for fast lookups.
	// haveList KeySet -- not needed. all values in datastore?

	// wantList is the set of keys we want values for. a map for fast lookups.
	wantList KeySet
58

59 60
	strategy StrategyFunc

61
	haltChan chan struct{}
62 63
}

64
// NewSession initializes a bitswap session.
65
func NewSession(parent context.Context, s net.Sender, p *peer.Peer, d ds.Datastore, r routing.IpfsRouting) *BitSwap {
66 67 68

	// TODO(brian): define a contract for management of async operations that
	// fall under bitswap's purview
69
	// ctx, _ := context.WithCancel(parent)
70

71
	receiver := tx.Forwarder{}
72
	sender := tx.NewSender(s)
73
	bs := &BitSwap{
74 75 76 77 78
		peer:          p,
		datastore:     d,
		partners:      LedgerMap{},
		wantList:      KeySet{},
		routing:       r,
79
		sender:        sender,
80
		haltChan:      make(chan struct{}),
81
		notifications: notifications.New(),
82
		strategy:      YesManStrategy,
83
	}
84
	receiver.Delegate(bs)
85 86

	return bs
87 88 89
}

// GetBlock attempts to retrieve a particular block from peers, within timeout.
90
func (bs *BitSwap) Block(k u.Key, timeout time.Duration) (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
91
	*blocks.Block, error) {
92
	u.DOut("Bitswap GetBlock: '%s'\n", k.Pretty())
93
	begin := time.Now()
94
	tleft := timeout - time.Now().Sub(begin)
95
	provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
96

97
	blockChannel := make(chan blocks.Block)
98
	after := time.After(tleft)
99

100
	// TODO: when the data is received, shut down this for loop ASAP
101 102 103 104 105
	go func() {
		for p := range provs_ch {
			go func(pr *peer.Peer) {
				blk, err := bs.getBlock(k, pr, tleft)
				if err != nil {
Jeromy's avatar
Jeromy committed
106
					u.PErr("getBlock returned: %v\n", err)
107 108 109
					return
				}
				select {
110
				case blockChannel <- *blk:
111 112 113 114 115
				default:
				}
			}(p)
		}
	}()
116 117

	select {
118 119
	case block := <-blockChannel:
		close(blockChannel)
120
		return &block, nil
121 122
	case <-after:
		return nil, u.ErrTimeout
123
	}
124 125
}

126
func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*blocks.Block, error) {
Jeromy's avatar
Jeromy committed
127
	u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty())
128

129 130 131
	ctx, _ := context.WithTimeout(context.Background(), timeout)
	blockChannel := bs.notifications.Subscribe(ctx, k)

132
	message := bsmsg.New()
133
	message.AppendWanted(k)
134
	bs.sender.SendMessage(ctx, p, message)
135

136 137
	block, ok := <-blockChannel
	if !ok {
138
		u.PErr("getBlock for '%s' timed out.\n", k.Pretty())
139 140
		return nil, u.ErrTimeout
	}
141
	return &block, nil
142 143
}

144
// HasBlock announces the existance of a block to BitSwap, potentially sending
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
145
// it to peers (Partners) whose WantLists include it.
146
func (bs *BitSwap) HasBlock(blk blocks.Block) error {
147 148
	go func() {
		for _, ledger := range bs.partners {
149
			if ledger.WantListContains(blk.Key()) {
150 151 152 153 154 155 156 157 158 159
				//send block to node
				if ledger.ShouldSend() {
					bs.SendBlock(ledger.Partner, blk)
				}
			}
		}
	}()
	return bs.routing.Provide(blk.Key())
}

160
func (bs *BitSwap) SendBlock(p *peer.Peer, b blocks.Block) {
161
	message := bsmsg.New()
162
	// TODO(brian): change interface to accept value instead of pointer
163
	message.AppendBlock(b)
164
	bs.sender.SendMessage(context.Background(), p, message)
165
}
166

167 168
// peerWantsBlock will check if we have the block in question,
// and then if we do, check the ledger for whether or not we should send it.
169 170
func (bs *BitSwap) peerWantsBlock(p *peer.Peer, wanted u.Key) {
	u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), wanted.Pretty())
171
	ledger := bs.getLedger(p)
172

173
	blk_i, err := bs.datastore.Get(wanted.DatastoreKey())
174 175
	if err != nil {
		if err == ds.ErrNotFound {
176
			ledger.Wants(wanted)
177
		}
178
		u.PErr("datastore get error: %v\n", err)
179 180
		return
	}
Jeromy's avatar
Jeromy committed
181

182
	blk, ok := blk_i.([]byte)
183
	if !ok {
184
		u.PErr("data conversion error.\n")
185 186 187
		return
	}

188
	if ledger.ShouldSend() {
189 190 191 192 193
		u.DOut("Sending block to peer.\n")
		bblk, err := blocks.NewBlock(blk)
		if err != nil {
			u.PErr("newBlock error: %v\n", err)
			return
194
		}
195
		bs.SendBlock(p, *bblk)
196
		ledger.SentBytes(len(blk))
197 198
	} else {
		u.DOut("Decided not to send block.")
199 200 201
	}
}

202
func (bs *BitSwap) blockReceive(p *peer.Peer, blk blocks.Block) {
203 204 205 206 207 208 209
	u.DOut("blockReceive: %s\n", blk.Key().Pretty())
	err := bs.datastore.Put(ds.NewKey(string(blk.Key())), blk.Data)
	if err != nil {
		u.PErr("blockReceive error: %v\n", err)
		return
	}

210
	bs.notifications.Publish(blk)
211

Brian Tiger Chow's avatar
Brian Tiger Chow committed
212
	ledger := bs.getLedger(p)
213 214 215
	ledger.ReceivedBytes(len(blk.Data))
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
216
func (bs *BitSwap) getLedger(p *peer.Peer) *Ledger {
217
	l, ok := bs.partners[p.Key()]
218 219 220 221 222
	if ok {
		return l
	}

	l = new(Ledger)
223
	l.Strategy = bs.strategy
224 225
	l.Partner = p
	bs.partners[p.Key()] = l
226 227
	return l
}
228

229
func (bs *BitSwap) SendWantList(wl KeySet) error {
230
	message := bsmsg.New()
231
	for k, _ := range wl {
232
		message.AppendWanted(k)
233 234 235 236
	}

	// Lets just ping everybody all at once
	for _, ledger := range bs.partners {
237
		bs.sender.SendMessage(context.TODO(), ledger.Partner, message)
238 239 240 241 242
	}

	return nil
}

243 244 245
func (bs *BitSwap) Halt() {
	bs.haltChan <- struct{}{}
}
246

247 248
func (bs *BitSwap) ReceiveMessage(
	ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
249
	bsmsg.BitSwapMessage, *peer.Peer, error) {
250 251 252 253 254 255 256 257 258 259 260
	if incoming.Blocks() != nil {
		for _, block := range incoming.Blocks() {
			go bs.blockReceive(sender, block)
		}
	}

	if incoming.Wantlist() != nil {
		for _, want := range incoming.Wantlist() {
			go bs.peerWantsBlock(sender, want)
		}
	}
261 262
	return nil, nil, errors.New("TODO implement")
}