Commit 456719ed authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #523 from jbenet/feat/ping

Implement ipfs ping
parents 6699d929 0f4f91c2
package commands
import (
"bytes"
"fmt"
"io"
"strings"
"time"
cmds "github.com/jbenet/go-ipfs/commands"
core "github.com/jbenet/go-ipfs/core"
peer "github.com/jbenet/go-ipfs/p2p/peer"
u "github.com/jbenet/go-ipfs/util"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
const kPingTimeout = 10 * time.Second
type PingResult struct {
Success bool
Time time.Duration
Text string
}
var PingCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "send echo request packets to IPFS hosts",
Synopsis: `
Send pings to a peer using the routing system to discover its address
`,
ShortDescription: `
ipfs ping is a tool to find a node (in the routing system),
send pings, wait for pongs, and print out round-trip latency information.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("peer ID", true, true, "ID of peer to be pinged"),
},
Options: []cmds.Option{
cmds.IntOption("count", "n", "number of ping messages to send"),
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(chan interface{})
if !ok {
return nil, u.ErrCast()
}
marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*PingResult)
if !ok {
return nil, u.ErrCast()
}
buf := new(bytes.Buffer)
if len(obj.Text) > 0 {
buf = bytes.NewBufferString(obj.Text + "\n")
} else if obj.Success {
fmt.Fprintf(buf, "Pong received: time=%.2f ms\n", obj.Time.Seconds()*1000)
} else {
fmt.Fprintf(buf, "Pong failed\n")
}
return buf, nil
}
return &cmds.ChannelMarshaler{
Channel: outChan,
Marshaler: marshal,
}, nil
},
},
Run: func(req cmds.Request) (interface{}, error) {
n, err := req.Context().GetNode()
if err != nil {
return nil, err
}
// Must be online!
if !n.OnlineMode() {
return nil, errNotOnline
}
addr, peerID, err := ParsePeerParam(req.Arguments()[0])
if err != nil {
return nil, err
}
if addr != nil {
n.Peerstore.AddAddress(peerID, addr)
}
// Set up number of pings
numPings := 10
val, found, err := req.Option("count").Int()
if err != nil {
return nil, err
}
if found {
numPings = val
}
outChan := make(chan interface{})
go pingPeer(n, peerID, numPings, outChan)
return outChan, nil
},
Type: PingResult{},
}
func pingPeer(n *core.IpfsNode, pid peer.ID, numPings int, outChan chan interface{}) {
defer close(outChan)
if len(n.Peerstore.Addresses(pid)) == 0 {
// Make sure we can find the node in question
outChan <- &PingResult{
Text: fmt.Sprintf("Looking up peer %s", pid.Pretty()),
}
// TODO: get master context passed in
ctx, _ := context.WithTimeout(context.TODO(), kPingTimeout)
p, err := n.Routing.FindPeer(ctx, pid)
if err != nil {
outChan <- &PingResult{Text: fmt.Sprintf("Peer lookup error: %s", err)}
return
}
n.Peerstore.AddPeerInfo(p)
}
outChan <- &PingResult{Text: fmt.Sprintf("PING %s.", pid.Pretty())}
var total time.Duration
for i := 0; i < numPings; i++ {
ctx, _ := context.WithTimeout(context.TODO(), kPingTimeout)
took, err := n.Routing.Ping(ctx, pid)
if err != nil {
log.Errorf("Ping error: %s", err)
outChan <- &PingResult{Text: fmt.Sprintf("Ping error: %s", err)}
break
}
outChan <- &PingResult{
Success: true,
Time: took,
}
total += took
time.Sleep(time.Second)
}
averagems := total.Seconds() * 1000 / float64(numPings)
outChan <- &PingResult{
Text: fmt.Sprintf("Average latency: %.2fms", averagems),
}
}
func ParsePeerParam(text string) (ma.Multiaddr, peer.ID, error) {
// to be replaced with just multiaddr parsing, once ptp is a multiaddr protocol
idx := strings.LastIndex(text, "/")
if idx == -1 {
pid, err := peer.IDB58Decode(text)
if err != nil {
return nil, "", err
}
return nil, pid, nil
}
addrS := text[:idx]
peeridS := text[idx+1:]
var maddr ma.Multiaddr
var pid peer.ID
// make sure addrS parses as a multiaddr.
if len(addrS) > 0 {
var err error
maddr, err = ma.NewMultiaddr(addrS)
if err != nil {
return nil, "", err
}
}
// make sure idS parses as a peer.ID
var err error
pid, err = peer.IDB58Decode(peeridS)
if err != nil {
return nil, "", err
}
return maddr, pid, nil
}
......@@ -81,6 +81,7 @@ var rootSubcommands = map[string]*cmds.Command{
"pin": PinCmd,
"refs": RefsCmd,
"swarm": SwarmCmd,
"ping": PingCmd,
"update": UpdateCmd,
"version": VersionCmd,
}
......
......@@ -103,8 +103,8 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
// Ping new peer to register in their routing table
// NOTE: this should be done better...
if err := dht.Ping(ctx, npeer); err != nil {
return fmt.Errorf("failed to ping newly connected peer: %s\n", err)
if _, err := dht.Ping(ctx, npeer); err != nil {
return fmt.Errorf("failed to ping newly connected peer: %s", err)
}
log.Event(ctx, "connect", dht.self, npeer)
dht.Update(ctx, npeer)
......@@ -329,7 +329,7 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
peers := dht.routingTable.NearestPeers(kb.ConvertKey(u.Key(id)), 5)
for _, p := range peers {
ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
err := dht.Ping(ctx, p)
_, err := dht.Ping(ctx, p)
if err != nil {
log.Errorf("Ping error: %s", err)
}
......
......@@ -116,12 +116,12 @@ func TestPing(t *testing.T) {
//Test that we can ping the node
ctxT, _ := context.WithTimeout(ctx, 100*time.Millisecond)
if err := dhtA.Ping(ctxT, peerB); err != nil {
if _, err := dhtA.Ping(ctxT, peerB); err != nil {
t.Fatal(err)
}
ctxT, _ = context.WithTimeout(ctx, 100*time.Millisecond)
if err := dhtB.Ping(ctxT, peerA); err != nil {
if _, err := dhtB.Ping(ctxT, peerA); err != nil {
t.Fatal(err)
}
}
......
......@@ -3,6 +3,7 @@ package dht
import (
"math"
"sync"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
......@@ -434,12 +435,14 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
}
// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) (time.Duration, error) {
// Thoughts: maybe this should accept an ID and do a peer lookup?
log.Debugf("ping %s start", p)
before := time.Now()
pmes := pb.NewMessage(pb.Message_PING, "", 0)
_, err := dht.sendRequest(ctx, p, pmes)
log.Debugf("ping %s end (err = %s)", p, err)
return err
return time.Now().Sub(before), err
}
......@@ -2,6 +2,7 @@ package mockrouting
import (
"errors"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
......@@ -79,4 +80,8 @@ func (c *client) Provide(_ context.Context, key u.Key) error {
return c.server.Announce(info, key)
}
func (c *client) Ping(ctx context.Context, p peer.ID) (time.Duration, error) {
return 0, nil
}
var _ routing.IpfsRouting = &client{}
......@@ -3,6 +3,7 @@ package routing
import (
"errors"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
......@@ -36,4 +37,7 @@ type IpfsRouting interface {
// FindPeer searches for a peer with given ID, returns a peer.PeerInfo
// with relevant addresses.
FindPeer(context.Context, peer.ID) (peer.PeerInfo, error)
// Ping a peer, log the time it took
Ping(context.Context, peer.ID) (time.Duration, error)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment