bitswap.go 6.91 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package bitswap

import (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
4 5 6 7 8
	"time"

	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	blocks "github.com/jbenet/go-ipfs/blocks"
10
	peer "github.com/jbenet/go-ipfs/peer"
11
	routing "github.com/jbenet/go-ipfs/routing"
12
	dht "github.com/jbenet/go-ipfs/routing/dht"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13
	swarm "github.com/jbenet/go-ipfs/swarm"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
)

17 18 19 20 21 22 23 24
// PartnerWantListMax is the bound for the number of keys we'll store per
// partner. These are usually taken from the top of the Partner's WantList
// advertisements. WantLists are sorted in terms of priority.
const PartnerWantListMax = 10

// KeySet is just a convenient alias for maps of keys, where we only care
// access/lookups.
type KeySet map[u.Key]struct{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26
// BitSwap instances implement the bitswap protocol.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27
type BitSwap struct {
28
	// peer is the identity of this (local) node.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29
	peer *peer.Peer
30 31

	// net holds the connections to all peers.
32 33
	net     swarm.Network
	meschan *swarm.Chan
34

35
	// datastore is the local database // Ledgers of known
36 37
	datastore ds.Datastore

38
	// routing interface for communication
39
	routing *dht.IpfsDHT
40

41
	listener *swarm.MessageListener
42

43 44 45 46 47 48 49 50 51 52 53
	// partners is a map of currently active bitswap relationships.
	// The Ledger has the peer.ID, and the peer connection works through net.
	// Ledgers of known relationships (active or inactive) stored in datastore.
	// Changes to the Ledger should be committed to the datastore.
	partners map[u.Key]*Ledger

	// haveList is the set of keys we have values for. a map for fast lookups.
	// haveList KeySet -- not needed. all values in datastore?

	// wantList is the set of keys we want values for. a map for fast lookups.
	wantList KeySet
54

55 56
	strategy StrategyFunc

57
	haltChan chan struct{}
58 59
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60
// NewBitSwap creates a new BitSwap instance. It does not check its parameters.
61 62
func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsRouting) *BitSwap {
	bs := &BitSwap{
63 64 65
		peer:      p,
		net:       net,
		datastore: d,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66 67
		partners:  LedgerMap{},
		wantList:  KeySet{},
68
		routing:   r.(*dht.IpfsDHT),
69 70
		meschan:   net.GetChannel(swarm.PBWrapper_BITSWAP),
		haltChan:  make(chan struct{}),
71
		listener:  swarm.NewMessageListener(),
72
	}
73 74 75

	go bs.handleMessages()
	return bs
76 77 78
}

// GetBlock attempts to retrieve a particular block from peers, within timeout.
79
func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80
	*blocks.Block, error) {
81
	u.DOut("Bitswap GetBlock: '%s'\n", k.Pretty())
82
	begin := time.Now()
83
	tleft := timeout - time.Now().Sub(begin)
84
	provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
85 86 87

	valchan := make(chan []byte)
	after := time.After(tleft)
88

89
	// TODO: when the data is received, shut down this for loop ASAP
90 91 92 93 94
	go func() {
		for p := range provs_ch {
			go func(pr *peer.Peer) {
				blk, err := bs.getBlock(k, pr, tleft)
				if err != nil {
Jeromy's avatar
Jeromy committed
95
					u.PErr("getBlock returned: %v\n", err)
96 97 98 99 100 101 102 103 104
					return
				}
				select {
				case valchan <- blk:
				default:
				}
			}(p)
		}
	}()
105 106 107

	select {
	case blkdata := <-valchan:
Jeromy's avatar
Jeromy committed
108
		close(valchan)
109 110 111
		return blocks.NewBlock(blkdata)
	case <-after:
		return nil, u.ErrTimeout
112
	}
113 114
}

115
func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) {
Jeromy's avatar
Jeromy committed
116
	u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty())
117 118 119

	pmes := new(PBMessage)
	pmes.Wantlist = []string{string(k)}
120 121

	after := time.After(timeout)
122 123
	resp := bs.listener.Listen(string(k), 1, timeout)
	smes := swarm.NewMessage(p, pmes)
124 125 126 127
	bs.meschan.Outgoing <- smes

	select {
	case resp_mes := <-resp:
128
		return resp_mes.Data, nil
129
	case <-after:
130
		u.PErr("getBlock for '%s' timed out.\n", k.Pretty())
131 132 133 134
		return nil, u.ErrTimeout
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135 136
// HaveBlock announces the existance of a block to BitSwap, potentially sending
// it to peers (Partners) whose WantLists include it.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
func (bs *BitSwap) HaveBlock(blk *blocks.Block) error {
	go func() {
		for _, ledger := range bs.partners {
			if _, ok := ledger.WantList[blk.Key()]; ok {
				//send block to node
				if ledger.ShouldSend() {
					bs.SendBlock(ledger.Partner, blk)
				}
			}
		}
	}()
	return bs.routing.Provide(blk.Key())
}

func (bs *BitSwap) SendBlock(p *peer.Peer, b *blocks.Block) {
	pmes := new(PBMessage)
	pmes.Blocks = [][]byte{b.Data}

	swarm_mes := swarm.NewMessage(p, pmes)
	bs.meschan.Outgoing <- swarm_mes
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
}
158 159 160 161

func (bs *BitSwap) handleMessages() {
	for {
		select {
162 163 164 165 166 167 168
		case mes := <-bs.meschan.Incoming:
			pmes := new(PBMessage)
			err := proto.Unmarshal(mes.Data, pmes)
			if err != nil {
				u.PErr("%v\n", err)
				continue
			}
169 170 171 172 173 174 175 176 177
			if pmes.Blocks != nil {
				for _, blkData := range pmes.Blocks {
					blk, err := blocks.NewBlock(blkData)
					if err != nil {
						u.PErr("%v\n", err)
						continue
					}
					go bs.blockReceive(mes.Peer, blk)
				}
178 179
			}

180 181 182 183
			if pmes.Wantlist != nil {
				for _, want := range pmes.Wantlist {
					go bs.peerWantsBlock(mes.Peer, want)
				}
184
			}
185
		case <-bs.haltChan:
186
			return
187 188 189
		}
	}
}
190

191 192 193 194
// peerWantsBlock will check if we have the block in question,
// and then if we do, check the ledger for whether or not we should send it.
func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) {
	u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), u.Key(want).Pretty())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
195
	ledg := bs.getLedger(p)
196

197 198
	dsk := ds.NewKey(want)
	blk_i, err := bs.datastore.Get(dsk)
199 200
	if err != nil {
		if err == ds.ErrNotFound {
201 202
			// TODO: this needs to be different. We need timeouts.
			ledg.WantList[u.Key(want)] = struct{}{}
203
		}
204
		u.PErr("datastore get error: %v\n", err)
205 206
		return
	}
Jeromy's avatar
Jeromy committed
207

208
	blk, ok := blk_i.([]byte)
209
	if !ok {
210
		u.PErr("data conversion error.\n")
211 212 213
		return
	}

214 215 216 217 218 219
	if ledg.ShouldSend() {
		u.DOut("Sending block to peer.\n")
		bblk, err := blocks.NewBlock(blk)
		if err != nil {
			u.PErr("newBlock error: %v\n", err)
			return
220
		}
221 222
		bs.SendBlock(p, bblk)
		ledg.SentBytes(len(blk))
223 224
	} else {
		u.DOut("Decided not to send block.")
225 226 227
	}
}

228 229 230 231 232 233 234 235 236 237 238 239 240 241
func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) {
	u.DOut("blockReceive: %s\n", blk.Key().Pretty())
	err := bs.datastore.Put(ds.NewKey(string(blk.Key())), blk.Data)
	if err != nil {
		u.PErr("blockReceive error: %v\n", err)
		return
	}

	mes := &swarm.Message{
		Peer: p,
		Data: blk.Data,
	}
	bs.listener.Respond(string(blk.Key()), mes)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
242
	ledger := bs.getLedger(p)
243 244 245
	ledger.ReceivedBytes(len(blk.Data))
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
246
func (bs *BitSwap) getLedger(p *peer.Peer) *Ledger {
247
	l, ok := bs.partners[p.Key()]
248 249 250 251 252
	if ok {
		return l
	}

	l = new(Ledger)
253
	l.Strategy = bs.strategy
254 255
	l.Partner = p
	bs.partners[p.Key()] = l
256 257
	return l
}
258

259
func (bs *BitSwap) SendWantList(wl KeySet) error {
260 261 262
	pmes := new(PBMessage)
	for k, _ := range wl {
		pmes.Wantlist = append(pmes.Wantlist, string(k))
263 264 265 266
	}

	// Lets just ping everybody all at once
	for _, ledger := range bs.partners {
267
		bs.meschan.Outgoing <- swarm.NewMessage(ledger.Partner, pmes)
268 269 270 271 272
	}

	return nil
}

273 274 275
func (bs *BitSwap) Halt() {
	bs.haltChan <- struct{}{}
}
276 277 278 279 280 281 282

func (bs *BitSwap) SetStrategy(sf StrategyFunc) {
	bs.strategy = sf
	for _, ledg := range bs.partners {
		ledg.Strategy = sf
	}
}