Commit 4189d50d authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

fix bug in diagnostics, and add more peers to closer peer responses

parent c77ed6d2
......@@ -8,8 +8,22 @@ import (
"time"
"github.com/jbenet/go-ipfs/core"
diagn "github.com/jbenet/go-ipfs/diagnostics"
)
func PrintDiagnostics(info []*diagn.DiagInfo, out io.Writer) {
for _, i := range info {
fmt.Fprintf(out, "Peer: %s\n", i.ID)
fmt.Fprintf(out, "\tUp for: %s\n", i.LifeSpan.String())
fmt.Fprintf(out, "\tConnected To:\n")
for _, c := range i.Connections {
fmt.Fprintf(out, "\t%s\n\t\tLatency = %s\n", c.ID, c.Latency.String())
}
fmt.Fprintln(out)
}
}
func Diag(n *core.IpfsNode, args []string, opts map[string]interface{}, out io.Writer) error {
if n.Diagnostics == nil {
return errors.New("Cannot run diagnostic in offline mode!")
......@@ -29,15 +43,7 @@ func Diag(n *core.IpfsNode, args []string, opts map[string]interface{}, out io.W
return err
}
} else {
for _, i := range info {
fmt.Fprintf(out, "Peer: %s\n", i.ID)
fmt.Fprintf(out, "\tUp for: %s\n", i.LifeSpan.String())
fmt.Fprintf(out, "\tConnected To:\n")
for _, c := range i.Connections {
fmt.Fprintf(out, "\t%s\n\t\tLatency = %s\n", c.ID, c.Latency.String())
}
fmt.Fprintln(out)
}
PrintDiagnostics(info, out)
}
return nil
}
......@@ -48,7 +48,7 @@ type connDiagInfo struct {
ID string
}
type diagInfo struct {
type DiagInfo struct {
ID string
Connections []connDiagInfo
Keys []string
......@@ -56,7 +56,7 @@ type diagInfo struct {
CodeVersion string
}
func (di *diagInfo) Marshal() []byte {
func (di *DiagInfo) Marshal() []byte {
b, err := json.Marshal(di)
if err != nil {
panic(err)
......@@ -69,8 +69,8 @@ func (d *Diagnostics) getPeers() []*peer.Peer {
return d.network.GetPeerList()
}
func (d *Diagnostics) getDiagInfo() *diagInfo {
di := new(diagInfo)
func (d *Diagnostics) getDiagInfo() *DiagInfo {
di := new(DiagInfo)
di.CodeVersion = "github.com/jbenet/go-ipfs"
di.ID = d.self.ID.Pretty()
di.LifeSpan = time.Since(d.birth)
......@@ -88,7 +88,7 @@ func newID() string {
return string(id)
}
func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) {
log.Debug("Getting diagnostic.")
ctx, _ := context.WithTimeout(context.TODO(), timeout)
......@@ -102,7 +102,7 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error)
peers := d.getPeers()
log.Debug("Sending diagnostic request to %d peers.", len(peers))
var out []*diagInfo
var out []*DiagInfo
di := d.getDiagInfo()
out = append(out, di)
......@@ -134,15 +134,15 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error)
return out, nil
}
func AppendDiagnostics(data []byte, cur []*diagInfo) []*diagInfo {
func AppendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo {
buf := bytes.NewBuffer(data)
dec := json.NewDecoder(buf)
for {
di := new(diagInfo)
di := new(DiagInfo)
err := dec.Decode(di)
if err != nil {
if err != io.EOF {
log.Error("error decoding diagInfo: %v", err)
log.Error("error decoding DiagInfo: %v", err)
}
break
}
......@@ -216,6 +216,7 @@ func (d *Diagnostics) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, e
sendcount := 0
for _, p := range d.getPeers() {
log.Debug("Sending diagnostic request to peer: %s", p)
sendcount++
go func(p *peer.Peer) {
out, err := d.getDiagnosticFromPeer(ctx, p, pmes)
if err != nil {
......
......@@ -76,7 +76,7 @@ func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sende
// Connect to a new peer at the given address, ping and add to the routing table
func (dht *IpfsDHT) Connect(ctx context.Context, npeer *peer.Peer) (*peer.Peer, error) {
log.Debug("Connect to new peer: %s\n", npeer)
log.Debug("Connect to new peer: %s", npeer)
// TODO(jbenet,whyrusleeping)
//
......@@ -109,13 +109,13 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
mData := mes.Data()
if mData == nil {
// TODO handle/log err
log.Error("Message contained nil data.")
return nil
}
mPeer := mes.Peer()
if mPeer == nil {
// TODO handle/log err
log.Error("Message contained nil peer.")
return nil
}
......@@ -123,7 +123,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
pmes := new(Message)
err := proto.Unmarshal(mData, pmes)
if err != nil {
// TODO handle/log err
log.Error("Error unmarshaling data")
return nil
}
......@@ -138,25 +138,27 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
handler := dht.handlerForMsgType(pmes.GetType())
if handler == nil {
// TODO handle/log err
log.Error("got back nil handler from handlerForMsgType")
return nil
}
// dispatch handler.
rpmes, err := handler(mPeer, pmes)
if err != nil {
// TODO handle/log err
log.Error("handle message error: %s", err)
return nil
}
// if nil response, return it before serializing
if rpmes == nil {
log.Warning("Got back nil response from request.")
return nil
}
// serialize response msg
rmes, err := msg.FromObject(mPeer, rpmes)
if err != nil {
// TODO handle/log err
log.Error("serialze response error: %s", err)
return nil
}
......@@ -197,6 +199,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message
return rpmes, nil
}
// putValueToNetwork stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer,
key string, value []byte) error {
......@@ -226,7 +229,7 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) e
}
log.Debug("%s putProvider: %s for %s", dht.self, p, key)
if *rpmes.Key != *pmes.Key {
if rpmes.GetKey() != pmes.GetKey() {
return errors.New("provider not added correctly")
}
......@@ -261,23 +264,11 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
// Perhaps we were given closer peers
var peers []*peer.Peer
for _, pb := range pmes.GetCloserPeers() {
if peer.ID(pb.GetId()).Equal(dht.self.ID) {
continue
}
addr, err := ma.NewMultiaddr(pb.GetAddr())
pr, err := dht.addPeer(pb)
if err != nil {
log.Error("%v", err.Error())
log.Error("%s", err)
continue
}
// check if we already have this peer.
pr, _ := dht.peerstore.Get(peer.ID(pb.GetId()))
if pr == nil {
pr = &peer.Peer{ID: peer.ID(pb.GetId())}
dht.peerstore.Put(pr)
}
pr.AddAddress(addr) // idempotent
peers = append(peers, pr)
}
......@@ -290,6 +281,27 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
return nil, nil, u.ErrNotFound
}
func (dht *IpfsDHT) addPeer(pb *Message_Peer) (*peer.Peer, error) {
if peer.ID(pb.GetId()).Equal(dht.self.ID) {
return nil, errors.New("cannot add self as peer")
}
addr, err := ma.NewMultiaddr(pb.GetAddr())
if err != nil {
return nil, err
}
// check if we already have this peer.
pr, _ := dht.peerstore.Get(peer.ID(pb.GetId()))
if pr == nil {
pr = &peer.Peer{ID: peer.ID(pb.GetId())}
dht.peerstore.Put(pr)
}
pr.AddAddress(addr) // idempotent
return pr, nil
}
// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer,
key u.Key, level int) (*Message, error) {
......@@ -327,6 +339,7 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
return nil, u.ErrNotFound
}
// getLocal attempts to retrieve the value from the datastore
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
dht.dslock.Lock()
defer dht.dslock.Unlock()
......@@ -342,6 +355,7 @@ func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
return byt, nil
}
// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
return dht.datastore.Put(key.DsKey(), value)
}
......@@ -419,39 +433,44 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer
return provArr
}
// nearestPeerToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeerToQuery(pmes *Message) *peer.Peer {
// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []*peer.Peer {
level := pmes.GetClusterLevel()
cluster := dht.routingTables[level]
key := u.Key(pmes.GetKey())
closer := cluster.NearestPeer(kb.ConvertKey(key))
closer := cluster.NearestPeers(kb.ConvertKey(key), count)
return closer
}
// betterPeerToQuery returns nearestPeerToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeerToQuery(pmes *Message) *peer.Peer {
closer := dht.nearestPeerToQuery(pmes)
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer {
closer := dht.nearestPeersToQuery(pmes, count)
// no node? nil
if closer == nil {
return nil
}
// == to self? nil
if closer.ID.Equal(dht.self.ID) {
log.Error("Attempted to return self! this shouldnt happen...")
return nil
// == to self? thats bad
for _, p := range closer {
if p.ID.Equal(dht.self.ID) {
log.Error("Attempted to return self! this shouldnt happen...")
return nil
}
}
// self is closer? nil
key := u.Key(pmes.GetKey())
if kb.Closer(dht.self.ID, closer.ID, key) {
return nil
var filtered []*peer.Peer
for _, p := range closer {
// must all be closer than self
key := u.Key(pmes.GetKey())
if !kb.Closer(dht.self.ID, p.ID, key) {
filtered = append(filtered, p)
}
}
// ok seems like a closer node.
return closer
// ok seems like closer nodes
return filtered
}
func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {
......
......@@ -13,6 +13,8 @@ import (
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
)
var CloserPeerCount = 4
// dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(*peer.Peer, *Message) (*Message, error)
......@@ -83,10 +85,12 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
}
// Find closest peer on given cluster to desired key and reply with that info
closer := dht.betterPeerToQuery(pmes)
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
if closer != nil {
log.Debug("handleGetValue returning a closer peer: '%s'\n", closer)
resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
for _, p := range closer {
log.Debug("handleGetValue returning closer peer: '%s'", p)
}
resp.CloserPeers = peersToPBPeers(closer)
}
return resp, nil
......@@ -109,27 +113,31 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error) {
resp := newMessage(pmes.GetType(), "", pmes.GetClusterLevel())
var closest *peer.Peer
var closest []*peer.Peer
// if looking for self... special case where we send it on CloserPeers.
if peer.ID(pmes.GetKey()).Equal(dht.self.ID) {
closest = dht.self
closest = []*peer.Peer{dht.self}
} else {
closest = dht.betterPeerToQuery(pmes)
closest = dht.betterPeersToQuery(pmes, CloserPeerCount)
}
if closest == nil {
log.Error("handleFindPeer: could not find anything.\n")
log.Error("handleFindPeer: could not find anything.")
return resp, nil
}
if len(closest.Addresses) == 0 {
log.Error("handleFindPeer: no addresses for connected peer...\n")
return resp, nil
var withAddresses []*peer.Peer
for _, p := range closest {
if len(p.Addresses) > 0 {
withAddresses = append(withAddresses, p)
}
}
log.Debug("handleFindPeer: sending back '%s'\n", closest)
resp.CloserPeers = peersToPBPeers([]*peer.Peer{closest})
for _, p := range withAddresses {
log.Debug("handleFindPeer: sending back '%s'", p)
}
resp.CloserPeers = peersToPBPeers(withAddresses)
return resp, nil
}
......@@ -157,9 +165,9 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, e
}
// Also send closer peers.
closer := dht.betterPeerToQuery(pmes)
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
if closer != nil {
resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
resp.CloserPeers = peersToPBPeers(closer)
}
return resp, nil
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment