bitswap.go 6.81 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package bitswap

import (
4
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5
	blocks "github.com/jbenet/go-ipfs/blocks"
6
	peer "github.com/jbenet/go-ipfs/peer"
7
	routing "github.com/jbenet/go-ipfs/routing"
8
	dht "github.com/jbenet/go-ipfs/routing/dht"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	swarm "github.com/jbenet/go-ipfs/swarm"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	u "github.com/jbenet/go-ipfs/util"
11

12
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
13

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
)

17 18 19 20 21 22 23 24
// PartnerWantListMax is the bound for the number of keys we'll store per
// partner. These are usually taken from the top of the Partner's WantList
// advertisements. WantLists are sorted in terms of priority.
const PartnerWantListMax = 10

// KeySet is just a convenient alias for maps of keys, where we only care
// access/lookups.
type KeySet map[u.Key]struct{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26
// BitSwap instances implement the bitswap protocol.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27
type BitSwap struct {
28
	// peer is the identity of this (local) node.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29
	peer *peer.Peer
30 31

	// net holds the connections to all peers.
32 33
	net     swarm.Network
	meschan *swarm.Chan
34

35
	// datastore is the local database // Ledgers of known
36 37
	datastore ds.Datastore

38
	// routing interface for communication
39
	routing *dht.IpfsDHT
40

41
	listener *swarm.MessageListener
42

43 44 45 46 47 48 49 50 51 52 53
	// partners is a map of currently active bitswap relationships.
	// The Ledger has the peer.ID, and the peer connection works through net.
	// Ledgers of known relationships (active or inactive) stored in datastore.
	// Changes to the Ledger should be committed to the datastore.
	partners map[u.Key]*Ledger

	// haveList is the set of keys we have values for. a map for fast lookups.
	// haveList KeySet -- not needed. all values in datastore?

	// wantList is the set of keys we want values for. a map for fast lookups.
	wantList KeySet
54

55 56
	strategy StrategyFunc

57
	haltChan chan struct{}
58 59
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60
// NewBitSwap creates a new BitSwap instance. It does not check its parameters.
61 62
func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsRouting) *BitSwap {
	bs := &BitSwap{
63 64 65
		peer:      p,
		net:       net,
		datastore: d,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66 67
		partners:  LedgerMap{},
		wantList:  KeySet{},
68
		routing:   r.(*dht.IpfsDHT),
69 70
		meschan:   net.GetChannel(swarm.PBWrapper_BITSWAP),
		haltChan:  make(chan struct{}),
71
		listener:  swarm.NewMessageListener(),
72
	}
73 74 75

	go bs.handleMessages()
	return bs
76 77 78
}

// GetBlock attempts to retrieve a particular block from peers, within timeout.
79
func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80
	*blocks.Block, error) {
81
	u.DOut("Bitswap GetBlock: '%s'\n", k.Pretty())
82
	begin := time.Now()
83
	tleft := timeout - time.Now().Sub(begin)
84
	provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
85 86 87

	valchan := make(chan []byte)
	after := time.After(tleft)
88

89
	// TODO: when the data is received, shut down this for loop ASAP
90 91 92 93 94
	go func() {
		for p := range provs_ch {
			go func(pr *peer.Peer) {
				blk, err := bs.getBlock(k, pr, tleft)
				if err != nil {
Jeromy's avatar
Jeromy committed
95
					u.PErr("getBlock returned: %v\n", err)
96 97 98 99 100 101 102 103 104
					return
				}
				select {
				case valchan <- blk:
				default:
				}
			}(p)
		}
	}()
105 106 107

	select {
	case blkdata := <-valchan:
Jeromy's avatar
Jeromy committed
108
		close(valchan)
109 110 111
		return blocks.NewBlock(blkdata)
	case <-after:
		return nil, u.ErrTimeout
112
	}
113 114
}

115
func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) {
Jeromy's avatar
Jeromy committed
116
	u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty())
117

118 119
	message := newMessage()
	message.AppendWanted(k)
120 121

	after := time.After(timeout)
122
	resp := bs.listener.Listen(string(k), 1, timeout)
123
	bs.meschan.Outgoing <- message.ToSwarm(p)
124 125 126

	select {
	case resp_mes := <-resp:
127
		return resp_mes.Data, nil
128
	case <-after:
129
		u.PErr("getBlock for '%s' timed out.\n", k.Pretty())
130 131 132 133
		return nil, u.ErrTimeout
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
134 135
// HaveBlock announces the existance of a block to BitSwap, potentially sending
// it to peers (Partners) whose WantLists include it.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
func (bs *BitSwap) HaveBlock(blk *blocks.Block) error {
	go func() {
		for _, ledger := range bs.partners {
			if _, ok := ledger.WantList[blk.Key()]; ok {
				//send block to node
				if ledger.ShouldSend() {
					bs.SendBlock(ledger.Partner, blk)
				}
			}
		}
	}()
	return bs.routing.Provide(blk.Key())
}

func (bs *BitSwap) SendBlock(p *peer.Peer, b *blocks.Block) {
151 152 153
	message := newMessage()
	message.AppendBlock(b)
	bs.meschan.Outgoing <- message.ToSwarm(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154
}
155 156 157 158

func (bs *BitSwap) handleMessages() {
	for {
		select {
159 160 161 162 163 164 165
		case mes := <-bs.meschan.Incoming:
			pmes := new(PBMessage)
			err := proto.Unmarshal(mes.Data, pmes)
			if err != nil {
				u.PErr("%v\n", err)
				continue
			}
166 167 168 169 170 171 172 173 174
			if pmes.Blocks != nil {
				for _, blkData := range pmes.Blocks {
					blk, err := blocks.NewBlock(blkData)
					if err != nil {
						u.PErr("%v\n", err)
						continue
					}
					go bs.blockReceive(mes.Peer, blk)
				}
175 176
			}

177 178 179 180
			if pmes.Wantlist != nil {
				for _, want := range pmes.Wantlist {
					go bs.peerWantsBlock(mes.Peer, want)
				}
181
			}
182
		case <-bs.haltChan:
183
			return
184 185 186
		}
	}
}
187

188 189 190 191
// peerWantsBlock will check if we have the block in question,
// and then if we do, check the ledger for whether or not we should send it.
func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) {
	u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), u.Key(want).Pretty())
192 193
	ledg := bs.GetLedger(p)

194 195
	dsk := ds.NewKey(want)
	blk_i, err := bs.datastore.Get(dsk)
196 197
	if err != nil {
		if err == ds.ErrNotFound {
198 199
			// TODO: this needs to be different. We need timeouts.
			ledg.WantList[u.Key(want)] = struct{}{}
200
		}
201
		u.PErr("datastore get error: %v\n", err)
202 203
		return
	}
Jeromy's avatar
Jeromy committed
204

205
	blk, ok := blk_i.([]byte)
206
	if !ok {
207
		u.PErr("data conversion error.\n")
208 209 210
		return
	}

211 212 213 214 215 216
	if ledg.ShouldSend() {
		u.DOut("Sending block to peer.\n")
		bblk, err := blocks.NewBlock(blk)
		if err != nil {
			u.PErr("newBlock error: %v\n", err)
			return
217
		}
218 219
		bs.SendBlock(p, bblk)
		ledg.SentBytes(len(blk))
220 221
	} else {
		u.DOut("Decided not to send block.")
222 223 224
	}
}

225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) {
	u.DOut("blockReceive: %s\n", blk.Key().Pretty())
	err := bs.datastore.Put(ds.NewKey(string(blk.Key())), blk.Data)
	if err != nil {
		u.PErr("blockReceive error: %v\n", err)
		return
	}

	mes := &swarm.Message{
		Peer: p,
		Data: blk.Data,
	}
	bs.listener.Respond(string(blk.Key()), mes)

	ledger := bs.GetLedger(p)
	ledger.ReceivedBytes(len(blk.Data))
}

243 244
func (bs *BitSwap) GetLedger(p *peer.Peer) *Ledger {
	l, ok := bs.partners[p.Key()]
245 246 247 248 249
	if ok {
		return l
	}

	l = new(Ledger)
250
	l.Strategy = bs.strategy
251 252
	l.Partner = p
	bs.partners[p.Key()] = l
253 254
	return l
}
255

256
func (bs *BitSwap) SendWantList(wl KeySet) error {
257
	message := newMessage()
258
	for k, _ := range wl {
259
		message.AppendWanted(k)
260 261 262 263
	}

	// Lets just ping everybody all at once
	for _, ledger := range bs.partners {
264
		bs.meschan.Outgoing <- message.ToSwarm(ledger.Partner)
265 266 267 268 269
	}

	return nil
}

270 271 272
func (bs *BitSwap) Halt() {
	bs.haltChan <- struct{}{}
}
273 274 275 276 277 278 279

func (bs *BitSwap) SetStrategy(sf StrategyFunc) {
	bs.strategy = sf
	for _, ledg := range bs.partners {
		ledg.Strategy = sf
	}
}