Commit e10f4975 authored by Jeromy's avatar Jeromy

remove multilayered routing table from the DHT (for now)

parent 3f3321c1
...@@ -37,7 +37,7 @@ const doPinging = false ...@@ -37,7 +37,7 @@ const doPinging = false
type IpfsDHT struct { type IpfsDHT struct {
// Array of routing tables for differently distanced nodes // Array of routing tables for differently distanced nodes
// NOTE: (currently, only a single table is used) // NOTE: (currently, only a single table is used)
routingTables []*kb.RoutingTable routingTable *kb.RoutingTable
// the network services we need // the network services we need
dialer inet.Dialer dialer inet.Dialer
...@@ -80,10 +80,7 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia ...@@ -80,10 +80,7 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia
dht.providers = NewProviderManager(dht.Context(), p.ID()) dht.providers = NewProviderManager(dht.Context(), p.ID())
dht.AddCloserChild(dht.providers) dht.AddCloserChild(dht.providers)
dht.routingTables = make([]*kb.RoutingTable, 3) dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Minute)
dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour)
dht.birth = time.Now() dht.birth = time.Now()
dht.Validators = make(map[string]ValidatorFunc) dht.Validators = make(map[string]ValidatorFunc)
...@@ -243,9 +240,9 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) er ...@@ -243,9 +240,9 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) er
} }
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer, func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
key u.Key, level int) ([]byte, []peer.Peer, error) { key u.Key) ([]byte, []peer.Peer, error) {
pmes, err := dht.getValueSingle(ctx, p, key, level) pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
...@@ -265,7 +262,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer, ...@@ -265,7 +262,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
// TODO decide on providers. This probably shouldn't be happening. // TODO decide on providers. This probably shouldn't be happening.
if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 { if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
val, err := dht.getFromPeerList(ctx, key, prv, level) val, err := dht.getFromPeerList(ctx, key, prv)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
...@@ -292,9 +289,9 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer, ...@@ -292,9 +289,9 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
// getValueSingle simply performs the get value RPC with the given parameters // getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer, func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
key u.Key, level int) (*pb.Message, error) { key u.Key) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), level) pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0)
return dht.sendRequest(ctx, p, pmes) return dht.sendRequest(ctx, p, pmes)
} }
...@@ -303,7 +300,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer, ...@@ -303,7 +300,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
// one to get the value from? Or just connect to one at a time until we get a // one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it? // successful connection and request the value from it?
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key, func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
peerlist []*pb.Message_Peer, level int) ([]byte, error) { peerlist []*pb.Message_Peer) ([]byte, error) {
for _, pinfo := range peerlist { for _, pinfo := range peerlist {
p, err := dht.ensureConnectedToPeer(ctx, pinfo) p, err := dht.ensureConnectedToPeer(ctx, pinfo)
...@@ -312,7 +309,7 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key, ...@@ -312,7 +309,7 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
continue continue
} }
pmes, err := dht.getValueSingle(ctx, p, key, level) pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil { if err != nil {
log.Errorf("getFromPeers error: %s\n", err) log.Errorf("getFromPeers error: %s\n", err)
continue continue
...@@ -379,47 +376,30 @@ func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error { ...@@ -379,47 +376,30 @@ func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
return dht.datastore.Put(key.DsKey(), data) return dht.datastore.Put(key.DsKey(), data)
} }
// Update signals to all routingTables to Update their last-seen status // Update signals the routingTable to Update its last-seen status
// on the given peer. // on the given peer.
func (dht *IpfsDHT) Update(ctx context.Context, p peer.Peer) { func (dht *IpfsDHT) Update(ctx context.Context, p peer.Peer) {
log.Event(ctx, "updatePeer", p) log.Event(ctx, "updatePeer", p)
removedCount := 0 dht.routingTable.Update(p)
for _, route := range dht.routingTables {
removed := route.Update(p)
// Only close the connection if no tables refer to this peer
if removed != nil {
removedCount++
}
}
// Only close the connection if no tables refer to this peer
// if removedCount == len(dht.routingTables) {
// dht.network.ClosePeer(p)
// }
// ACTUALLY, no, let's not just close the connection. it may be connected
// due to other things. it seems that we just need connection timeouts
// after some deadline of inactivity.
} }
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) { func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
for _, table := range dht.routingTables { p := dht.routingTable.Find(id)
p := table.Find(id) if p != nil {
if p != nil { return p, dht.routingTable
return p, table
}
} }
return nil, nil return nil, nil
} }
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is // findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*pb.Message, error) { func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), level) pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
return dht.sendRequest(ctx, p, pmes) return dht.sendRequest(ctx, p, pmes)
} }
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*pb.Message, error) { func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), level) pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0)
return dht.sendRequest(ctx, p, pmes) return dht.sendRequest(ctx, p, pmes)
} }
...@@ -446,11 +426,8 @@ func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.Peer ...@@ -446,11 +426,8 @@ func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.Peer
// nearestPeersToQuery returns the routing tables closest peers. // nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer { func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
level := pmes.GetClusterLevel()
cluster := dht.routingTables[level]
key := u.Key(pmes.GetKey()) key := u.Key(pmes.GetKey())
closer := cluster.NearestPeers(kb.ConvertKey(key), count) closer := dht.routingTable.NearestPeers(kb.ConvertKey(key), count)
return closer return closer
} }
...@@ -537,7 +514,7 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) { ...@@ -537,7 +514,7 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
case <-tick: case <-tick:
id := make([]byte, 16) id := make([]byte, 16)
rand.Read(id) rand.Read(id)
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5) peers := dht.routingTable.NearestPeers(kb.ConvertKey(u.Key(id)), 5)
for _, p := range peers { for _, p := range peers {
ctx, _ := context.WithTimeout(dht.Context(), time.Second*5) ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
err := dht.Ping(ctx, p) err := dht.Ping(ctx, p)
......
...@@ -36,7 +36,7 @@ func (dht *IpfsDHT) getDiagInfo() *diagInfo { ...@@ -36,7 +36,7 @@ func (dht *IpfsDHT) getDiagInfo() *diagInfo {
di.LifeSpan = time.Since(dht.birth) di.LifeSpan = time.Since(dht.birth)
di.Keys = nil // Currently no way to query datastore di.Keys = nil // Currently no way to query datastore
for _, p := range dht.routingTables[0].ListPeers() { for _, p := range dht.routingTable.ListPeers() {
d := connDiagInfo{p.GetLatency(), p.ID()} d := connDiagInfo{p.GetLatency(), p.ID()}
di.Connections = append(di.Connections, d) di.Connections = append(di.Connections, d)
} }
......
...@@ -38,11 +38,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error ...@@ -38,11 +38,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
return err return err
} }
var peers []peer.Peer peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
for _, route := range dht.routingTables {
npeers := route.NearestPeers(kb.ConvertKey(key), KValue)
peers = append(peers, npeers...)
}
query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
log.Debugf("%s PutValue qry part %v", dht.self, p) log.Debugf("%s PutValue qry part %v", dht.self, p)
...@@ -71,9 +67,8 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { ...@@ -71,9 +67,8 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
return val, nil return val, nil
} }
// get closest peers in the routing tables // get closest peers in the routing table
routeLevel := 0 closest := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize)
closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize)
if closest == nil || len(closest) == 0 { if closest == nil || len(closest) == 0 {
log.Warning("Got no peers back from routing table!") log.Warning("Got no peers back from routing table!")
return nil, kb.ErrLookupFailure return nil, kb.ErrLookupFailure
...@@ -82,7 +77,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { ...@@ -82,7 +77,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
// setup the Query // setup the Query
query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
val, peers, err := dht.getValueOrPeers(ctx, p, key, routeLevel) val, peers, err := dht.getValueOrPeers(ctx, p, key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -116,7 +111,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { ...@@ -116,7 +111,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
dht.providers.AddProvider(key, dht.self) dht.providers.AddProvider(key, dht.self)
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize) peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize)
if len(peers) == 0 { if len(peers) == 0 {
return nil return nil
} }
...@@ -166,7 +161,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co ...@@ -166,7 +161,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
// setup the Query // setup the Query
query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
pmes, err := dht.findProvidersSingle(ctx, p, key, 0) pmes, err := dht.findProvidersSingle(ctx, p, key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -205,7 +200,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co ...@@ -205,7 +200,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
return &dhtQueryResult{closerPeers: clpeers}, nil return &dhtQueryResult{closerPeers: clpeers}, nil
}) })
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue) peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
_, err := query.Run(ctx, peers) _, err := query.Run(ctx, peers)
if err != nil { if err != nil {
log.Errorf("FindProviders Query error: %s", err) log.Errorf("FindProviders Query error: %s", err)
...@@ -253,8 +248,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error) ...@@ -253,8 +248,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error)
return p, nil return p, nil
} }
routeLevel := 0 closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if closest == nil || len(closest) == 0 { if closest == nil || len(closest) == 0 {
return nil, kb.ErrLookupFailure return nil, kb.ErrLookupFailure
} }
...@@ -270,7 +264,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error) ...@@ -270,7 +264,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error)
// setup the Query // setup the Query
query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel) pmes, err := dht.findPeerSingle(ctx, p, id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -316,8 +310,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< ...@@ -316,8 +310,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
peerchan := make(chan peer.Peer, asyncQueryBuffer) peerchan := make(chan peer.Peer, asyncQueryBuffer)
peersSeen := map[string]peer.Peer{} peersSeen := map[string]peer.Peer{}
routeLevel := 0 closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if closest == nil || len(closest) == 0 { if closest == nil || len(closest) == 0 {
return nil, kb.ErrLookupFailure return nil, kb.ErrLookupFailure
} }
...@@ -325,7 +318,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< ...@@ -325,7 +318,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
// setup the Query // setup the Query
query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel) pmes, err := dht.findPeerSingle(ctx, p, id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment