diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go new file mode 100644 index 0000000000000000000000000000000000000000..bb9b14025d907cb9789f7afa18056795eebed56b --- /dev/null +++ b/core/commands/bitswap.go @@ -0,0 +1,86 @@ +package commands + +import ( + "bytes" + "encoding/json" + cmds "github.com/jbenet/go-ipfs/commands" + bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" + u "github.com/jbenet/go-ipfs/util" + "io" +) + +var BitswapCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "A set of commands to manipulate the bitswap agent", + ShortDescription: ``, + }, + Subcommands: map[string]*cmds.Command{ + "wantlist": showWantlistCmd, + "stat": bitswapStatCmd, + }, +} + +var showWantlistCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Show blocks currently on the wantlist", + ShortDescription: ` +Print out all blocks currently on the bitswap wantlist for the local peer`, + }, + Type: KeyList{}, + Run: func(req cmds.Request, res cmds.Response) { + nd, err := req.Context().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + res.SetOutput(&KeyList{nd.Exchange.GetWantlist()}) + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: KeyListTextMarshaler, + }, +} + +var bitswapStatCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "show some diagnostic information on the bitswap agent", + ShortDescription: ``, + }, + Type: bitswap.Stat{}, + Run: func(req cmds.Request, res cmds.Response) { + nd, err := req.Context().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + bs, ok := nd.Exchange.(*bitswap.Bitswap) + if !ok { + res.SetError(u.ErrCast(), cmds.ErrNormal) + return + } + + st, err := bs.Stat() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + res.SetOutput(st) + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + out, ok := res.Output().(*bitswap.Stat) + if !ok { + return nil, u.ErrCast() + } + buf := new(bytes.Buffer) + enc := json.NewEncoder(buf) + err := enc.Encode(out) + if err != nil { + return nil, err + } + return buf, nil + }, + }, +} diff --git a/core/commands/root.go b/core/commands/root.go index 8040a24344d97eee246f5802f606a424f550f5fb..b88d2e4f1cf4b9f6eb352348e4d2228d71f09702 100644 --- a/core/commands/root.go +++ b/core/commands/root.go @@ -98,7 +98,7 @@ var rootSubcommands = map[string]*cmds.Command{ "swarm": SwarmCmd, "update": UpdateCmd, "version": VersionCmd, - "wantlist": WantlistCmd, + "bitswap": BitswapCmd, } func init() { diff --git a/core/commands/wantlist.go b/core/commands/wantlist.go deleted file mode 100644 index 35c7f0a71b9d3cc10d625a50b869bce055fdc6c5..0000000000000000000000000000000000000000 --- a/core/commands/wantlist.go +++ /dev/null @@ -1,34 +0,0 @@ -package commands - -import cmds "github.com/jbenet/go-ipfs/commands" - -var WantlistCmd = &cmds.Command{ - Helptext: cmds.HelpText{ - Tagline: "A set of commands to work with the bitswap wantlist", - ShortDescription: ``, - }, - Subcommands: map[string]*cmds.Command{ - "show": showWantlistCmd, - }, -} - -var showWantlistCmd = &cmds.Command{ - Helptext: cmds.HelpText{ - Tagline: "Show blocks currently on the wantlist", - ShortDescription: ` -Print out all blocks currently on the bitswap wantlist for the local peer`, - }, - Type: KeyList{}, - Run: func(req cmds.Request, res cmds.Response) { - nd, err := req.Context().GetNode() - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - - res.SetOutput(&KeyList{nd.Exchange.GetWantlist()}) - }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: KeyListTextMarshaler, - }, -} diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 1101ffd9b1c573adc4215d538d1ed85ff478b957..d40a13efa3715ec117579645ddc82a7e740fcfb8 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -79,7 +79,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, px.Close() }() - bs := &bitswap{ + bs := &Bitswap{ self: p, blockstore: bstore, notifications: notif, @@ -97,8 +97,8 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, return bs } -// bitswap instances implement the bitswap protocol. -type bitswap struct { +// Bitswap instances implement the bitswap protocol. +type Bitswap struct { // the ID of the peer to act on behalf of self peer.ID @@ -133,7 +133,7 @@ type blockRequest struct { // GetBlock attempts to retrieve a particular block from peers within the // deadline enforced by the context. -func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) { +func (bs *Bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) { // Any async work initiated by this function must end when this function // returns. To ensure this, derive a new context. Note that it is okay to @@ -179,7 +179,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err // NB: Your request remains open until the context expires. To conserve // resources, provide a context with a reasonably short deadline (ie. not one // that lasts throughout the lifetime of the server) -func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) { +func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) { select { case <-bs.process.Closing(): return nil, errors.New("bitswap is closed") @@ -201,7 +201,7 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. // HasBlock announces the existance of a block to this bitswap service. The // service will potentially notify its peers. -func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { +func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { log.Event(ctx, "hasBlock", blk) select { case <-bs.process.Closing(): @@ -221,7 +221,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { return nil } -func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { +func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { set := pset.New() wg := sync.WaitGroup{} for peerToQuery := range peers { @@ -242,7 +242,7 @@ func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMe return nil } -func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error { +func (bs *Bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error { message := bsmsg.New() message.SetFull(true) for _, wanted := range bs.wantlist.Entries() { @@ -251,7 +251,7 @@ func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID return bs.sendWantlistMsgToPeers(ctx, message, peers) } -func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantlist.Entry) { +func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantlist.Entry) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -286,7 +286,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli } // TODO(brian): handle errors -func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) ( +func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) ( peer.ID, bsmsg.BitSwapMessage) { defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done() @@ -325,7 +325,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg } // Connected/Disconnected warns bitswap about peer connections -func (bs *bitswap) PeerConnected(p peer.ID) { +func (bs *Bitswap) PeerConnected(p peer.ID) { // TODO: add to clientWorker?? peers := make(chan peer.ID, 1) peers <- p @@ -337,11 +337,11 @@ func (bs *bitswap) PeerConnected(p peer.ID) { } // Connected/Disconnected warns bitswap about peer connections -func (bs *bitswap) PeerDisconnected(p peer.ID) { +func (bs *Bitswap) PeerDisconnected(p peer.ID) { bs.engine.PeerDisconnected(p) } -func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { +func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { if len(bkeys) < 1 { return } @@ -358,7 +358,7 @@ func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { } } -func (bs *bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { +func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { if len(bkeys) < 1 { return } @@ -383,7 +383,7 @@ func (bs *bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { wg.Wait() } -func (bs *bitswap) ReceiveError(err error) { +func (bs *Bitswap) ReceiveError(err error) { log.Debugf("Bitswap ReceiveError: %s", err) // TODO log the network error // TODO bubble the network error up to the parent context/error logger @@ -391,7 +391,7 @@ func (bs *bitswap) ReceiveError(err error) { // send strives to ensure that accounting is always performed when a message is // sent -func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error { +func (bs *Bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error { defer log.EventBegin(ctx, "sendMessage", p, m).Done() if err := bs.network.SendMessage(ctx, p, m); err != nil { return errors.Wrap(err) @@ -399,11 +399,11 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) return bs.engine.MessageSent(p, m) } -func (bs *bitswap) Close() error { +func (bs *Bitswap) Close() error { return bs.process.Close() } -func (bs *bitswap) GetWantlist() []u.Key { +func (bs *Bitswap) GetWantlist() []u.Key { var out []u.Key for _, e := range bs.wantlist.Entries() { out = append(out, e.Key) diff --git a/exchange/bitswap/stat.go b/exchange/bitswap/stat.go new file mode 100644 index 0000000000000000000000000000000000000000..f3c213f03c043a2b7be61348e6b2f0344c4a13b7 --- /dev/null +++ b/exchange/bitswap/stat.go @@ -0,0 +1,22 @@ +package bitswap + +import ( + peer "github.com/jbenet/go-ipfs/p2p/peer" + u "github.com/jbenet/go-ipfs/util" +) + +type Stat struct { + ProvideBufLen int + Wantlist []u.Key + Peers []peer.ID +} + +func (bs *Bitswap) Stat() (*Stat, error) { + st := new(Stat) + st.ProvideBufLen = len(bs.newBlocks) + st.Wantlist = bs.GetWantlist() + + st.Peers = bs.engine.Peers() + + return st, nil +} diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 3753edb629cf6ccdd629723a5040a8d8befc35b3..1b28aedb1380569598fef9f1a3c1448d4f87e398 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -8,7 +8,7 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) -func (bs *bitswap) startWorkers(px process.Process, ctx context.Context) { +func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { // Start up a worker to handle block requests this node is making px.Go(func(px process.Process) { bs.clientWorker(ctx) @@ -34,7 +34,7 @@ func (bs *bitswap) startWorkers(px process.Process, ctx context.Context) { } } -func (bs *bitswap) taskWorker(ctx context.Context) { +func (bs *Bitswap) taskWorker(ctx context.Context) { defer log.Info("bitswap task worker shutting down...") for { select { @@ -55,7 +55,7 @@ func (bs *bitswap) taskWorker(ctx context.Context) { } } -func (bs *bitswap) provideWorker(ctx context.Context) { +func (bs *Bitswap) provideWorker(ctx context.Context) { for { select { case blk, ok := <-bs.newBlocks: @@ -75,7 +75,7 @@ func (bs *bitswap) provideWorker(ctx context.Context) { } // TODO ensure only one active request per key -func (bs *bitswap) clientWorker(parent context.Context) { +func (bs *Bitswap) clientWorker(parent context.Context) { defer log.Info("bitswap client worker shutting down...") for { @@ -115,7 +115,7 @@ func (bs *bitswap) clientWorker(parent context.Context) { } } -func (bs *bitswap) rebroadcastWorker(parent context.Context) { +func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ctx, cancel := context.WithCancel(parent) defer cancel()