Commit b8fcc137 authored by Brian Tiger Chow's avatar Brian Tiger Chow

refac(bitswap) inline helper methods

for readability
parent 44484941
......@@ -10,6 +10,7 @@ import (
bsmsg "github.com/jbenet/go-ipfs/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/bitswap/network"
notifications "github.com/jbenet/go-ipfs/bitswap/notifications"
strategy "github.com/jbenet/go-ipfs/bitswap/strategy"
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blockstore"
peer "github.com/jbenet/go-ipfs/peer"
......@@ -36,6 +37,7 @@ type bitswap struct {
sender bsnet.NetworkAdapter
// blockstore is the local database
// NB: ensure threadsafety
blockstore blockstore.Blockstore
// routing interface for communication
......@@ -43,10 +45,11 @@ type bitswap struct {
notifications notifications.PubSub
// partners is a map of currently active bitswap relationships.
// The Ledger has the peer.ID, and the peer connection works through net.
// Ledgers of known relationships (active or inactive) stored in datastore.
// Changes to the Ledger should be committed to the datastore.
// strategist listens to network traffic and makes decisions about how to
// interact with partners.
// TODO(brian): save the strategist's state to the datastore
strategist strategy.Strategist
partners ledgerMap
// haveList is the set of keys we have values for. a map for fast lookups.
......@@ -60,6 +63,7 @@ type bitswap struct {
// NewSession initializes a bitswap session.
func NewSession(parent context.Context, s bsnet.NetworkService, p *peer.Peer, d ds.Datastore, directory Directory) Exchange {
// FIXME(brian): instantiate a concrete Strategist
receiver := bsnet.Forwarder{}
bs := &bitswap{
peer: p,
......@@ -79,7 +83,6 @@ func NewSession(parent context.Context, s bsnet.NetworkService, p *peer.Peer, d
// GetBlock attempts to retrieve a particular block from peers, within timeout.
func (bs *bitswap) Block(k u.Key, timeout time.Duration) (
*blocks.Block, error) {
u.DOut("Bitswap GetBlock: '%s'\n", k.Pretty())
begin := time.Now()
tleft := timeout - time.Now().Sub(begin)
provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
......@@ -93,7 +96,6 @@ func (bs *bitswap) Block(k u.Key, timeout time.Duration) (
go func(pr *peer.Peer) {
blk, err := bs.getBlock(k, pr, tleft)
if err != nil {
u.PErr("getBlock returned: %v\n", err)
return
}
select {
......@@ -114,113 +116,80 @@ func (bs *bitswap) Block(k u.Key, timeout time.Duration) (
}
func (bs *bitswap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*blocks.Block, error) {
u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty())
ctx, _ := context.WithTimeout(context.Background(), timeout)
blockChannel := bs.notifications.Subscribe(ctx, k)
message := bsmsg.New()
message.AppendWanted(k)
// FIXME(brian): register the accountant on the service wrapper to ensure
// that accounting is _always_ performed when SendMessage and
// ReceiveMessage are called
bs.sender.SendMessage(ctx, p, message)
bs.strategist.MessageSent(p, message)
block, ok := <-blockChannel
if !ok {
u.PErr("getBlock for '%s' timed out.\n", k.Pretty())
return nil, u.ErrTimeout
}
return &block, nil
}
func (bs *bitswap) sendToPeersThatWant(block blocks.Block) {
for _, p := range bs.strategist.Peers() {
if bs.strategist.IsWantedByPeer(block.Key(), p) {
if bs.strategist.ShouldSendToPeer(block.Key(), p) {
go bs.send(p, block)
}
}
}
}
// HasBlock announces the existance of a block to bitswap, potentially sending
// it to peers (Partners) whose WantLists include it.
func (bs *bitswap) HasBlock(blk blocks.Block) error {
go func() {
for _, ledger := range bs.partners {
if ledger.WantListContains(blk.Key()) {
//send block to node
if ledger.ShouldSend() {
bs.sendBlock(ledger.Partner, blk)
}
}
}
}()
go bs.sendToPeersThatWant(blk)
return bs.routing.Provide(blk.Key())
}
// TODO(brian): get a return value
func (bs *bitswap) sendBlock(p *peer.Peer, b blocks.Block) {
u.DOut("Sending block to peer.\n")
func (bs *bitswap) send(p *peer.Peer, b blocks.Block) {
message := bsmsg.New()
// TODO(brian): change interface to accept value instead of pointer
message.AppendBlock(b)
// FIXME(brian): pass ctx
bs.sender.SendMessage(context.Background(), p, message)
}
// peerWantsBlock will check if we have the block in question,
// and then if we do, check the ledger for whether or not we should send it.
func (bs *bitswap) peerWantsBlock(p *peer.Peer, wanted u.Key) {
u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), wanted.Pretty())
ledger := bs.getLedger(p)
if !ledger.ShouldSend() {
return
}
block, err := bs.blockstore.Get(wanted)
if err != nil { // TODO(brian): log/return the error
ledger.Wants(wanted)
return
}
bs.sendBlock(p, *block)
ledger.SentBytes(numBytes(*block))
}
// TODO(brian): return error
func (bs *bitswap) blockReceive(p *peer.Peer, blk blocks.Block) {
u.DOut("blockReceive: %s\n", blk.Key().Pretty())
err := bs.blockstore.Put(blk)
if err != nil {
u.PErr("blockReceive error: %v\n", err)
return
}
bs.notifications.Publish(blk)
ledger := bs.getLedger(p)
ledger.ReceivedBytes(len(blk.Data))
}
func (bs *bitswap) getLedger(p *peer.Peer) *ledger {
l, ok := bs.partners[p.Key()]
if ok {
return l
}
l = new(ledger)
l.Strategy = bs.strategy
l.Partner = p
bs.partners[p.Key()] = l
return l
bs.strategist.MessageSent(p, message)
}
func (bs *bitswap) Halt() {
bs.haltChan <- struct{}{}
}
// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
bs.strategist.MessageReceived(sender, incoming)
if incoming.Blocks() != nil {
for _, block := range incoming.Blocks() {
go bs.blockReceive(sender, block)
go bs.blockstore.Put(block) // FIXME(brian): err ignored
go bs.notifications.Publish(block)
}
}
if incoming.Wantlist() != nil {
for _, want := range incoming.Wantlist() {
// TODO(brian): return the block synchronously
go bs.peerWantsBlock(sender, want)
for _, key := range incoming.Wantlist() {
if bs.strategist.ShouldSendToPeer(key, sender) {
block, errBlockNotFound := bs.blockstore.Get(key)
if errBlockNotFound != nil {
// TODO(brian): log/return the error
continue
}
go bs.send(sender, *block)
}
}
}
return nil, nil, errors.New("TODO implement")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment