From 41c124a2825e7b1860464f090089799e4b3863d8 Mon Sep 17 00:00:00 2001 From: Jeromy <jeromyj@gmail.com> Date: Wed, 6 Aug 2014 18:37:45 -0700 Subject: [PATCH] worked on gathering data for diagnostic messages and some other misc cleanup --- peer/peer.go | 2 + routing/dht/bucket.go | 5 +++ routing/dht/dht.go | 87 +++++++++++++++++++++----------------- routing/dht/dht_test.go | 4 -- routing/dht/messages.pb.go | 8 ++++ routing/dht/messages.proto | 1 + routing/dht/pDHTMessage.go | 2 + routing/dht/routing.go | 27 ++++++++++++ routing/dht/table.go | 16 ++++--- routing/dht/util.go | 4 +- 10 files changed, 107 insertions(+), 49 deletions(-) diff --git a/peer/peer.go b/peer/peer.go index 0357ab55..a637274c 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -2,6 +2,7 @@ package peer import ( "encoding/hex" + "time" u "github.com/jbenet/go-ipfs/util" ma "github.com/jbenet/go-multiaddr" @@ -30,6 +31,7 @@ type Map map[u.Key]*Peer type Peer struct { ID ID Addresses []*ma.Multiaddr + Distance time.Duration } // Key returns the ID as a Key (string) for maps. diff --git a/routing/dht/bucket.go b/routing/dht/bucket.go index 120ed29a..996d299d 100644 --- a/routing/dht/bucket.go +++ b/routing/dht/bucket.go @@ -61,3 +61,8 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket { } return (*Bucket)(out) } + +func (b *Bucket) getIter() *list.Element { + bucket_list := (*list.List)(b) + return bucket_list.Front() +} diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 9b4854cf..44a56831 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -34,7 +34,7 @@ type IpfsDHT struct { // Map keys to peers that can provide their value // TODO: implement a TTL on each of these keys - providers map[u.Key][]*peer.Peer + providers map[u.Key][]*providerInfo providerLock sync.RWMutex // map of channels waiting for reply messages @@ -43,6 +43,9 @@ type IpfsDHT struct { // Signal to shutdown dht shutdown chan struct{} + + // When this peer started up + birth time.Time } // Create a new DHT object with the given peer as the 'local' host @@ -61,9 +64,10 @@ func NewDHT(p *peer.Peer) (*IpfsDHT, error) { dht.datastore = ds.NewMapDatastore() dht.self = p dht.listeners = make(map[uint64]chan *swarm.Message) - dht.providers = make(map[u.Key][]*peer.Peer) + dht.providers = make(map[u.Key][]*providerInfo) dht.shutdown = make(chan struct{}) dht.routes = NewRoutingTable(20, convertPeerID(p.ID)) + dht.birth = time.Now() return dht, nil } @@ -121,6 +125,8 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { // NOTE: this function is just a quick sketch func (dht *IpfsDHT) handleMessages() { u.DOut("Begin message handling routine") + + checkTimeouts := time.NewTicker(time.Minute * 5) for { select { case mes,ok := <-dht.network.Chan.Incoming: @@ -157,7 +163,10 @@ func (dht *IpfsDHT) handleMessages() { } // - u.DOut("Got message type: '%s' [id = %x]", DHTMessage_MessageType_name[int32(pmes.GetType())], pmes.GetId()) + u.DOut("[peer: %s]", dht.self.ID.Pretty()) + u.DOut("Got message type: '%s' [id = %x, from = %s]", + DHTMessage_MessageType_name[int32(pmes.GetType())], + pmes.GetId(), mes.Peer.ID.Pretty()) switch pmes.GetType() { case DHTMessage_GET_VALUE: dht.handleGetValue(mes.Peer, pmes) @@ -171,35 +180,57 @@ func (dht *IpfsDHT) handleMessages() { dht.handleGetProviders(mes.Peer, pmes) case DHTMessage_PING: dht.handlePing(mes.Peer, pmes) + case DHTMessage_DIAGNOSTIC: + // TODO: network diagnostic messages } case err := <-dht.network.Chan.Errors: u.DErr("dht err: %s", err) case <-dht.shutdown: + checkTimeouts.Stop() return + case <-checkTimeouts.C: + dht.providerLock.Lock() + for k,parr := range dht.providers { + var cleaned []*providerInfo + for _,v := range parr { + if time.Since(v.Creation) < time.Hour { + cleaned = append(cleaned, v) + } + } + dht.providers[k] = cleaned + } + dht.providerLock.Unlock() } } } func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) { dskey := ds.NewKey(pmes.GetKey()) + var resp *pDHTMessage i_val, err := dht.datastore.Get(dskey) if err == nil { - resp := &pDHTMessage{ + resp = &pDHTMessage{ Response: true, Id: *pmes.Id, Key: *pmes.Key, Value: i_val.([]byte), + Success: true, } - - mes := swarm.NewMessage(p, resp.ToProtobuf()) - dht.network.Chan.Outgoing <- mes } else if err == ds.ErrNotFound { // Find closest peer(s) to desired key and reply with that info - // TODO: this will need some other metadata in the protobuf message - // to signal to the querying peer that the data its receiving - // is actually a list of other peer + closer := dht.routes.NearestPeer(convertKey(u.Key(pmes.GetKey()))) + resp = &pDHTMessage{ + Response: true, + Id: *pmes.Id, + Key: *pmes.Key, + Value: closer.ID, + Success: false, + } } + + mes := swarm.NewMessage(p, resp.ToProtobuf()) + dht.network.Chan.Outgoing <- mes } // Store a value in this peer local storage @@ -263,14 +294,14 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) { // This is just a quick hack, formalize method of sending addrs later addrs := make(map[u.Key]string) for _,prov := range providers { - ma := prov.NetAddress("tcp") + ma := prov.Value.NetAddress("tcp") str,err := ma.String() if err != nil { u.PErr("Error: %s", err) continue } - addrs[prov.Key()] = str + addrs[prov.Value.Key()] = str } data,err := json.Marshal(addrs) @@ -290,6 +321,11 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) { dht.network.Chan.Outgoing <-mes } +type providerInfo struct { + Creation time.Time + Value *peer.Peer +} + func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) { //TODO: need to implement TTLs on providers key := u.Key(pmes.GetKey()) @@ -324,35 +360,10 @@ func (dht *IpfsDHT) Halt() { dht.network.Close() } -// Ping a peer, log the time it took -func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { - // Thoughts: maybe this should accept an ID and do a peer lookup? - u.DOut("Enter Ping.") - - pmes := pDHTMessage{Id: GenerateMessageID(), Type: DHTMessage_PING} - mes := swarm.NewMessage(p, pmes.ToProtobuf()) - - before := time.Now() - response_chan := dht.ListenFor(pmes.Id) - dht.network.Chan.Outgoing <- mes - - tout := time.After(timeout) - select { - case <-response_chan: - roundtrip := time.Since(before) - u.POut("Ping took %s.", roundtrip.String()) - return nil - case <-tout: - // Timed out, think about removing peer from network - u.DOut("Ping peer timed out.") - return u.ErrTimeout - } -} - func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) { u.DOut("Adding %s as provider for '%s'", p.Key().Pretty(), key) dht.providerLock.Lock() provs := dht.providers[key] - dht.providers[key] = append(provs, p) + dht.providers[key] = append(provs, &providerInfo{time.Now(), p}) dht.providerLock.Unlock() } diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 6217c29f..b57ca3f7 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -6,13 +6,9 @@ import ( ma "github.com/jbenet/go-multiaddr" u "github.com/jbenet/go-ipfs/util" - "fmt" - "time" ) -var _ = fmt.Println - func TestPing(t *testing.T) { u.Debug = false addr_a,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234") diff --git a/routing/dht/messages.pb.go b/routing/dht/messages.pb.go index e95f487c..4f427efa 100644 --- a/routing/dht/messages.pb.go +++ b/routing/dht/messages.pb.go @@ -74,6 +74,7 @@ type DHTMessage struct { Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"` Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` + Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -116,6 +117,13 @@ func (m *DHTMessage) GetResponse() bool { return false } +func (m *DHTMessage) GetSuccess() bool { + if m != nil && m.Success != nil { + return *m.Success + } + return false +} + func init() { proto.RegisterEnum("dht.DHTMessage_MessageType", DHTMessage_MessageType_name, DHTMessage_MessageType_value) } diff --git a/routing/dht/messages.proto b/routing/dht/messages.proto index a9a7fd3c..278a9520 100644 --- a/routing/dht/messages.proto +++ b/routing/dht/messages.proto @@ -22,4 +22,5 @@ message DHTMessage { // Signals whether or not this message is a response to another message optional bool response = 5; + optional bool success = 6; } diff --git a/routing/dht/pDHTMessage.go b/routing/dht/pDHTMessage.go index 65c03b1f..bfe37d35 100644 --- a/routing/dht/pDHTMessage.go +++ b/routing/dht/pDHTMessage.go @@ -7,6 +7,7 @@ type pDHTMessage struct { Value []byte Response bool Id uint64 + Success bool } func (m *pDHTMessage) ToProtobuf() *DHTMessage { @@ -19,6 +20,7 @@ func (m *pDHTMessage) ToProtobuf() *DHTMessage { pmes.Key = &m.Key pmes.Response = &m.Response pmes.Id = &m.Id + pmes.Success = &m.Success return pmes } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 48949835..82c88960 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -208,3 +208,30 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error return found_peer, nil } } + +// Ping a peer, log the time it took +func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { + // Thoughts: maybe this should accept an ID and do a peer lookup? + u.DOut("Enter Ping.") + + pmes := pDHTMessage{Id: GenerateMessageID(), Type: DHTMessage_PING} + mes := swarm.NewMessage(p, pmes.ToProtobuf()) + + before := time.Now() + response_chan := dht.ListenFor(pmes.Id) + dht.network.Chan.Outgoing <- mes + + tout := time.After(timeout) + select { + case <-response_chan: + roundtrip := time.Since(before) + p.Distance = roundtrip //TODO: This isnt threadsafe + u.POut("Ping took %s.", roundtrip.String()) + return nil + case <-tout: + // Timed out, think about removing peer from network + u.DOut("Ping peer timed out.") + dht.Unlisten(pmes.Id) + return u.ErrTimeout + } +} diff --git a/routing/dht/table.go b/routing/dht/table.go index ce8fdbc2..07ed70cb 100644 --- a/routing/dht/table.go +++ b/routing/dht/table.go @@ -1,12 +1,10 @@ package dht import ( - "encoding/hex" "container/list" "sort" peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" ) // RoutingTable defines the routing table. @@ -114,7 +112,6 @@ func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer { // Returns a list of the 'count' closest peers to the given ID func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { - u.POut("Searching table, size = %d", rt.Size()) cpl := xor(id, rt.local).commonPrefixLen() // Get bucket at cpl index or last bucket @@ -148,8 +145,6 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { var out []*peer.Peer for i := 0; i < count && i < peerArr.Len(); i++ { out = append(out, peerArr[i].p) - u.POut("peer out: %s - %s", peerArr[i].p.ID.Pretty(), - hex.EncodeToString(xor(id, convertPeerID(peerArr[i].p.ID)))) } return out @@ -163,3 +158,14 @@ func (rt *RoutingTable) Size() int { } return tot } + +// NOTE: This is potentially unsafe... use at your own risk +func (rt *RoutingTable) listpeers() []*peer.Peer { + var peers []*peer.Peer + for _,buck := range rt.Buckets { + for e := buck.getIter(); e != nil; e = e.Next() { + peers = append(peers, e.Value.(*peer.Peer)) + } + } + return peers +} diff --git a/routing/dht/util.go b/routing/dht/util.go index eed8d930..2adc8b76 100644 --- a/routing/dht/util.go +++ b/routing/dht/util.go @@ -11,8 +11,8 @@ import ( // ID for IpfsDHT should be a byte slice, to allow for simpler operations // (xor). DHT ids are based on the peer.IDs. // -// NOTE: peer.IDs are biased because they are multihashes (first bytes -// biased). Thus, may need to re-hash keys (uniform dist). TODO(jbenet) +// The type dht.ID signifies that its contents have been hashed from either a +// peer.ID or a util.Key. This unifies the keyspace type ID []byte func (id ID) Equal(other ID) bool { -- GitLab