Commit 41c124a2 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

worked on gathering data for diagnostic messages and some other misc cleanup

parent bd9fc2b7
......@@ -2,6 +2,7 @@ package peer
import (
"encoding/hex"
"time"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
......@@ -30,6 +31,7 @@ type Map map[u.Key]*Peer
type Peer struct {
ID ID
Addresses []*ma.Multiaddr
Distance time.Duration
}
// Key returns the ID as a Key (string) for maps.
......
......@@ -61,3 +61,8 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket {
}
return (*Bucket)(out)
}
func (b *Bucket) getIter() *list.Element {
bucket_list := (*list.List)(b)
return bucket_list.Front()
}
......@@ -34,7 +34,7 @@ type IpfsDHT struct {
// Map keys to peers that can provide their value
// TODO: implement a TTL on each of these keys
providers map[u.Key][]*peer.Peer
providers map[u.Key][]*providerInfo
providerLock sync.RWMutex
// map of channels waiting for reply messages
......@@ -43,6 +43,9 @@ type IpfsDHT struct {
// Signal to shutdown dht
shutdown chan struct{}
// When this peer started up
birth time.Time
}
// Create a new DHT object with the given peer as the 'local' host
......@@ -61,9 +64,10 @@ func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
dht.datastore = ds.NewMapDatastore()
dht.self = p
dht.listeners = make(map[uint64]chan *swarm.Message)
dht.providers = make(map[u.Key][]*peer.Peer)
dht.providers = make(map[u.Key][]*providerInfo)
dht.shutdown = make(chan struct{})
dht.routes = NewRoutingTable(20, convertPeerID(p.ID))
dht.birth = time.Now()
return dht, nil
}
......@@ -121,6 +125,8 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
// NOTE: this function is just a quick sketch
func (dht *IpfsDHT) handleMessages() {
u.DOut("Begin message handling routine")
checkTimeouts := time.NewTicker(time.Minute * 5)
for {
select {
case mes,ok := <-dht.network.Chan.Incoming:
......@@ -157,7 +163,10 @@ func (dht *IpfsDHT) handleMessages() {
}
//
u.DOut("Got message type: '%s' [id = %x]", DHTMessage_MessageType_name[int32(pmes.GetType())], pmes.GetId())
u.DOut("[peer: %s]", dht.self.ID.Pretty())
u.DOut("Got message type: '%s' [id = %x, from = %s]",
DHTMessage_MessageType_name[int32(pmes.GetType())],
pmes.GetId(), mes.Peer.ID.Pretty())
switch pmes.GetType() {
case DHTMessage_GET_VALUE:
dht.handleGetValue(mes.Peer, pmes)
......@@ -171,35 +180,57 @@ func (dht *IpfsDHT) handleMessages() {
dht.handleGetProviders(mes.Peer, pmes)
case DHTMessage_PING:
dht.handlePing(mes.Peer, pmes)
case DHTMessage_DIAGNOSTIC:
// TODO: network diagnostic messages
}
case err := <-dht.network.Chan.Errors:
u.DErr("dht err: %s", err)
case <-dht.shutdown:
checkTimeouts.Stop()
return
case <-checkTimeouts.C:
dht.providerLock.Lock()
for k,parr := range dht.providers {
var cleaned []*providerInfo
for _,v := range parr {
if time.Since(v.Creation) < time.Hour {
cleaned = append(cleaned, v)
}
}
dht.providers[k] = cleaned
}
dht.providerLock.Unlock()
}
}
}
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
dskey := ds.NewKey(pmes.GetKey())
var resp *pDHTMessage
i_val, err := dht.datastore.Get(dskey)
if err == nil {
resp := &pDHTMessage{
resp = &pDHTMessage{
Response: true,
Id: *pmes.Id,
Key: *pmes.Key,
Value: i_val.([]byte),
Success: true,
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <- mes
} else if err == ds.ErrNotFound {
// Find closest peer(s) to desired key and reply with that info
// TODO: this will need some other metadata in the protobuf message
// to signal to the querying peer that the data its receiving
// is actually a list of other peer
closer := dht.routes.NearestPeer(convertKey(u.Key(pmes.GetKey())))
resp = &pDHTMessage{
Response: true,
Id: *pmes.Id,
Key: *pmes.Key,
Value: closer.ID,
Success: false,
}
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <- mes
}
// Store a value in this peer local storage
......@@ -263,14 +294,14 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
// This is just a quick hack, formalize method of sending addrs later
addrs := make(map[u.Key]string)
for _,prov := range providers {
ma := prov.NetAddress("tcp")
ma := prov.Value.NetAddress("tcp")
str,err := ma.String()
if err != nil {
u.PErr("Error: %s", err)
continue
}
addrs[prov.Key()] = str
addrs[prov.Value.Key()] = str
}
data,err := json.Marshal(addrs)
......@@ -290,6 +321,11 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
dht.network.Chan.Outgoing <-mes
}
type providerInfo struct {
Creation time.Time
Value *peer.Peer
}
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
//TODO: need to implement TTLs on providers
key := u.Key(pmes.GetKey())
......@@ -324,35 +360,10 @@ func (dht *IpfsDHT) Halt() {
dht.network.Close()
}
// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
u.DOut("Enter Ping.")
pmes := pDHTMessage{Id: GenerateMessageID(), Type: DHTMessage_PING}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
before := time.Now()
response_chan := dht.ListenFor(pmes.Id)
dht.network.Chan.Outgoing <- mes
tout := time.After(timeout)
select {
case <-response_chan:
roundtrip := time.Since(before)
u.POut("Ping took %s.", roundtrip.String())
return nil
case <-tout:
// Timed out, think about removing peer from network
u.DOut("Ping peer timed out.")
return u.ErrTimeout
}
}
func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
u.DOut("Adding %s as provider for '%s'", p.Key().Pretty(), key)
dht.providerLock.Lock()
provs := dht.providers[key]
dht.providers[key] = append(provs, p)
dht.providers[key] = append(provs, &providerInfo{time.Now(), p})
dht.providerLock.Unlock()
}
......@@ -6,13 +6,9 @@ import (
ma "github.com/jbenet/go-multiaddr"
u "github.com/jbenet/go-ipfs/util"
"fmt"
"time"
)
var _ = fmt.Println
func TestPing(t *testing.T) {
u.Debug = false
addr_a,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234")
......
......@@ -74,6 +74,7 @@ type DHTMessage struct {
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
......@@ -116,6 +117,13 @@ func (m *DHTMessage) GetResponse() bool {
return false
}
func (m *DHTMessage) GetSuccess() bool {
if m != nil && m.Success != nil {
return *m.Success
}
return false
}
func init() {
proto.RegisterEnum("dht.DHTMessage_MessageType", DHTMessage_MessageType_name, DHTMessage_MessageType_value)
}
......@@ -22,4 +22,5 @@ message DHTMessage {
// Signals whether or not this message is a response to another message
optional bool response = 5;
optional bool success = 6;
}
......@@ -7,6 +7,7 @@ type pDHTMessage struct {
Value []byte
Response bool
Id uint64
Success bool
}
func (m *pDHTMessage) ToProtobuf() *DHTMessage {
......@@ -19,6 +20,7 @@ func (m *pDHTMessage) ToProtobuf() *DHTMessage {
pmes.Key = &m.Key
pmes.Response = &m.Response
pmes.Id = &m.Id
pmes.Success = &m.Success
return pmes
}
......@@ -208,3 +208,30 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error
return found_peer, nil
}
}
// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
u.DOut("Enter Ping.")
pmes := pDHTMessage{Id: GenerateMessageID(), Type: DHTMessage_PING}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
before := time.Now()
response_chan := dht.ListenFor(pmes.Id)
dht.network.Chan.Outgoing <- mes
tout := time.After(timeout)
select {
case <-response_chan:
roundtrip := time.Since(before)
p.Distance = roundtrip //TODO: This isnt threadsafe
u.POut("Ping took %s.", roundtrip.String())
return nil
case <-tout:
// Timed out, think about removing peer from network
u.DOut("Ping peer timed out.")
dht.Unlisten(pmes.Id)
return u.ErrTimeout
}
}
package dht
import (
"encoding/hex"
"container/list"
"sort"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
// RoutingTable defines the routing table.
......@@ -114,7 +112,6 @@ func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer {
// Returns a list of the 'count' closest peers to the given ID
func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
u.POut("Searching table, size = %d", rt.Size())
cpl := xor(id, rt.local).commonPrefixLen()
// Get bucket at cpl index or last bucket
......@@ -148,8 +145,6 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
var out []*peer.Peer
for i := 0; i < count && i < peerArr.Len(); i++ {
out = append(out, peerArr[i].p)
u.POut("peer out: %s - %s", peerArr[i].p.ID.Pretty(),
hex.EncodeToString(xor(id, convertPeerID(peerArr[i].p.ID))))
}
return out
......@@ -163,3 +158,14 @@ func (rt *RoutingTable) Size() int {
}
return tot
}
// NOTE: This is potentially unsafe... use at your own risk
func (rt *RoutingTable) listpeers() []*peer.Peer {
var peers []*peer.Peer
for _,buck := range rt.Buckets {
for e := buck.getIter(); e != nil; e = e.Next() {
peers = append(peers, e.Value.(*peer.Peer))
}
}
return peers
}
......@@ -11,8 +11,8 @@ import (
// ID for IpfsDHT should be a byte slice, to allow for simpler operations
// (xor). DHT ids are based on the peer.IDs.
//
// NOTE: peer.IDs are biased because they are multihashes (first bytes
// biased). Thus, may need to re-hash keys (uniform dist). TODO(jbenet)
// The type dht.ID signifies that its contents have been hashed from either a
// peer.ID or a util.Key. This unifies the keyspace
type ID []byte
func (id ID) Equal(other ID) bool {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment