From b8fcc137a6c5114c6cf661a1096e7e09502f4573 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow <brian.holderchow@gmail.com> Date: Wed, 17 Sep 2014 18:22:22 -0700 Subject: [PATCH] refac(bitswap) inline helper methods for readability --- bitswap/bitswap.go | 115 +++++++++++++++++---------------------------- 1 file changed, 42 insertions(+), 73 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 7d128b87..2719eb0a 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -10,6 +10,7 @@ import ( bsmsg "github.com/jbenet/go-ipfs/bitswap/message" bsnet "github.com/jbenet/go-ipfs/bitswap/network" notifications "github.com/jbenet/go-ipfs/bitswap/notifications" + strategy "github.com/jbenet/go-ipfs/bitswap/strategy" blocks "github.com/jbenet/go-ipfs/blocks" blockstore "github.com/jbenet/go-ipfs/blockstore" peer "github.com/jbenet/go-ipfs/peer" @@ -36,6 +37,7 @@ type bitswap struct { sender bsnet.NetworkAdapter // blockstore is the local database + // NB: ensure threadsafety blockstore blockstore.Blockstore // routing interface for communication @@ -43,10 +45,11 @@ type bitswap struct { notifications notifications.PubSub - // partners is a map of currently active bitswap relationships. - // The Ledger has the peer.ID, and the peer connection works through net. - // Ledgers of known relationships (active or inactive) stored in datastore. - // Changes to the Ledger should be committed to the datastore. + // strategist listens to network traffic and makes decisions about how to + // interact with partners. + // TODO(brian): save the strategist's state to the datastore + strategist strategy.Strategist + partners ledgerMap // haveList is the set of keys we have values for. a map for fast lookups. @@ -60,6 +63,7 @@ type bitswap struct { // NewSession initializes a bitswap session. func NewSession(parent context.Context, s bsnet.NetworkService, p *peer.Peer, d ds.Datastore, directory Directory) Exchange { + // FIXME(brian): instantiate a concrete Strategist receiver := bsnet.Forwarder{} bs := &bitswap{ peer: p, @@ -79,7 +83,6 @@ func NewSession(parent context.Context, s bsnet.NetworkService, p *peer.Peer, d // GetBlock attempts to retrieve a particular block from peers, within timeout. func (bs *bitswap) Block(k u.Key, timeout time.Duration) ( *blocks.Block, error) { - u.DOut("Bitswap GetBlock: '%s'\n", k.Pretty()) begin := time.Now() tleft := timeout - time.Now().Sub(begin) provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout) @@ -93,7 +96,6 @@ func (bs *bitswap) Block(k u.Key, timeout time.Duration) ( go func(pr *peer.Peer) { blk, err := bs.getBlock(k, pr, tleft) if err != nil { - u.PErr("getBlock returned: %v\n", err) return } select { @@ -114,113 +116,80 @@ func (bs *bitswap) Block(k u.Key, timeout time.Duration) ( } func (bs *bitswap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*blocks.Block, error) { - u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty()) ctx, _ := context.WithTimeout(context.Background(), timeout) blockChannel := bs.notifications.Subscribe(ctx, k) message := bsmsg.New() message.AppendWanted(k) + + // FIXME(brian): register the accountant on the service wrapper to ensure + // that accounting is _always_ performed when SendMessage and + // ReceiveMessage are called bs.sender.SendMessage(ctx, p, message) + bs.strategist.MessageSent(p, message) block, ok := <-blockChannel if !ok { - u.PErr("getBlock for '%s' timed out.\n", k.Pretty()) return nil, u.ErrTimeout } return &block, nil } +func (bs *bitswap) sendToPeersThatWant(block blocks.Block) { + for _, p := range bs.strategist.Peers() { + if bs.strategist.IsWantedByPeer(block.Key(), p) { + if bs.strategist.ShouldSendToPeer(block.Key(), p) { + go bs.send(p, block) + } + } + } +} + // HasBlock announces the existance of a block to bitswap, potentially sending // it to peers (Partners) whose WantLists include it. func (bs *bitswap) HasBlock(blk blocks.Block) error { - go func() { - for _, ledger := range bs.partners { - if ledger.WantListContains(blk.Key()) { - //send block to node - if ledger.ShouldSend() { - bs.sendBlock(ledger.Partner, blk) - } - } - } - }() + go bs.sendToPeersThatWant(blk) return bs.routing.Provide(blk.Key()) } // TODO(brian): get a return value -func (bs *bitswap) sendBlock(p *peer.Peer, b blocks.Block) { - u.DOut("Sending block to peer.\n") +func (bs *bitswap) send(p *peer.Peer, b blocks.Block) { message := bsmsg.New() - // TODO(brian): change interface to accept value instead of pointer message.AppendBlock(b) + // FIXME(brian): pass ctx bs.sender.SendMessage(context.Background(), p, message) -} - -// peerWantsBlock will check if we have the block in question, -// and then if we do, check the ledger for whether or not we should send it. -func (bs *bitswap) peerWantsBlock(p *peer.Peer, wanted u.Key) { - u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), wanted.Pretty()) - - ledger := bs.getLedger(p) - - if !ledger.ShouldSend() { - return - } - - block, err := bs.blockstore.Get(wanted) - if err != nil { // TODO(brian): log/return the error - ledger.Wants(wanted) - return - } - bs.sendBlock(p, *block) - ledger.SentBytes(numBytes(*block)) -} - -// TODO(brian): return error -func (bs *bitswap) blockReceive(p *peer.Peer, blk blocks.Block) { - u.DOut("blockReceive: %s\n", blk.Key().Pretty()) - err := bs.blockstore.Put(blk) - if err != nil { - u.PErr("blockReceive error: %v\n", err) - return - } - - bs.notifications.Publish(blk) - - ledger := bs.getLedger(p) - ledger.ReceivedBytes(len(blk.Data)) -} - -func (bs *bitswap) getLedger(p *peer.Peer) *ledger { - l, ok := bs.partners[p.Key()] - if ok { - return l - } - - l = new(ledger) - l.Strategy = bs.strategy - l.Partner = p - bs.partners[p.Key()] = l - return l + bs.strategist.MessageSent(p, message) } func (bs *bitswap) Halt() { bs.haltChan <- struct{}{} } +// TODO(brian): handle errors func (bs *bitswap) ReceiveMessage( ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( *peer.Peer, bsmsg.BitSwapMessage, error) { + + bs.strategist.MessageReceived(sender, incoming) + if incoming.Blocks() != nil { for _, block := range incoming.Blocks() { - go bs.blockReceive(sender, block) + go bs.blockstore.Put(block) // FIXME(brian): err ignored + go bs.notifications.Publish(block) } } if incoming.Wantlist() != nil { - for _, want := range incoming.Wantlist() { - // TODO(brian): return the block synchronously - go bs.peerWantsBlock(sender, want) + for _, key := range incoming.Wantlist() { + if bs.strategist.ShouldSendToPeer(key, sender) { + block, errBlockNotFound := bs.blockstore.Get(key) + if errBlockNotFound != nil { + // TODO(brian): log/return the error + continue + } + go bs.send(sender, *block) + } } } return nil, nil, errors.New("TODO implement") -- GitLab