Commit 903a3095 authored by Jeromy's avatar Jeromy

some better logging and cleanup

parent cda86e98
...@@ -102,8 +102,8 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error { ...@@ -102,8 +102,8 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
return nil return nil
} }
// putValueToNetwork stores the given key/value pair at the peer 'p' // putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.ID, func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
key u.Key, rec *pb.Record) error { key u.Key, rec *pb.Record) error {
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0) pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
...@@ -237,12 +237,12 @@ func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) { ...@@ -237,12 +237,12 @@ func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
} }
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.PeerInfo, *kb.RoutingTable) { func (dht *IpfsDHT) FindLocal(id peer.ID) peer.PeerInfo {
p := dht.routingTable.Find(id) p := dht.routingTable.Find(id)
if p != "" { if p != "" {
return dht.peerstore.PeerInfo(p), dht.routingTable return dht.peerstore.PeerInfo(p)
} }
return peer.PeerInfo{}, nil return peer.PeerInfo{}
} }
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is // findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
...@@ -256,26 +256,6 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Ke ...@@ -256,26 +256,6 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Ke
return dht.sendRequest(ctx, p, pmes) return dht.sendRequest(ctx, p, pmes)
} }
func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.ID {
peers := pb.PBPeersToPeerInfos(pbps)
var provArr []peer.ID
for _, pi := range peers {
p := pi.ID
// Dont add outselves to the list
if p == dht.self {
continue
}
log.Debugf("%s adding provider: %s for %s", dht.self, p, key)
// TODO(jbenet) ensure providers is idempotent
dht.providers.AddProvider(key, p)
provArr = append(provArr, p)
}
return provArr
}
// nearestPeersToQuery returns the routing tables closest peers. // nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID { func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
key := u.Key(pmes.GetKey()) key := u.Key(pmes.GetKey())
......
...@@ -39,7 +39,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { ...@@ -39,7 +39,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
} }
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey()) log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey())
// setup response // setup response
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
...@@ -127,7 +127,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess ...@@ -127,7 +127,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
} }
err = dht.datastore.Put(dskey, data) err = dht.datastore.Put(dskey, data)
log.Debugf("%s handlePutValue %v\n", dht.self, dskey) log.Debugf("%s handlePutValue %v", dht.self, dskey)
return pmes, err return pmes, err
} }
...@@ -137,9 +137,6 @@ func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) ( ...@@ -137,9 +137,6 @@ func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (
} }
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
log.Errorf("handle find peer %s start", p)
defer log.Errorf("handle find peer %s end", p)
resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel()) resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
var closest []peer.ID var closest []peer.ID
......
...@@ -50,7 +50,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error ...@@ -50,7 +50,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
wg.Add(1) wg.Add(1)
go func(p peer.ID) { go func(p peer.ID) {
defer wg.Done() defer wg.Done()
err := dht.putValueToNetwork(ctx, p, key, rec) err := dht.putValueToPeer(ctx, p, key, rec)
if err != nil { if err != nil {
log.Errorf("failed putting value to peer: %s", err) log.Errorf("failed putting value to peer: %s", err)
} }
...@@ -125,12 +125,18 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { ...@@ -125,12 +125,18 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
return err return err
} }
wg := sync.WaitGroup{}
for p := range peers { for p := range peers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
err := dht.putProvider(ctx, p, string(key)) err := dht.putProvider(ctx, p, string(key))
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} }
}(p)
} }
wg.Wait()
return nil return nil
} }
...@@ -144,7 +150,6 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn ...@@ -144,7 +150,6 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn
} }
func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key, count int) (<-chan peer.ID, error) { func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key, count int) (<-chan peer.ID, error) {
log.Error("Get Closest Peers")
tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
if len(tablepeers) == 0 { if len(tablepeers) == 0 {
return nil, kb.ErrLookupFailure return nil, kb.ErrLookupFailure
...@@ -170,15 +175,12 @@ func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key, count int) ( ...@@ -170,15 +175,12 @@ func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key, count int) (
go func() { go func() {
wg.Wait() wg.Wait()
close(out) close(out)
log.Error("Closing closest peer chan")
}() }()
return out, nil return out, nil
} }
func (dht *IpfsDHT) getClosestPeersRecurse(ctx context.Context, key u.Key, p peer.ID, peers *pset.PeerSet, peerOut chan<- peer.ID) { func (dht *IpfsDHT) getClosestPeersRecurse(ctx context.Context, key u.Key, p peer.ID, peers *pset.PeerSet, peerOut chan<- peer.ID) {
log.Error("closest peers recurse")
defer log.Error("closest peers recurse end")
closer, err := dht.closerPeersSingle(ctx, key, p) closer, err := dht.closerPeersSingle(ctx, key, p)
if err != nil { if err != nil {
log.Errorf("error getting closer peers: %s", err) log.Errorf("error getting closer peers: %s", err)
...@@ -204,8 +206,6 @@ func (dht *IpfsDHT) getClosestPeersRecurse(ctx context.Context, key u.Key, p pee ...@@ -204,8 +206,6 @@ func (dht *IpfsDHT) getClosestPeersRecurse(ctx context.Context, key u.Key, p pee
} }
func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) ([]peer.ID, error) { func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) ([]peer.ID, error) {
log.Errorf("closest peers single %s %s", p, key)
defer log.Errorf("closest peers single end %s %s", p, key)
pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key)) pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -236,6 +236,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int ...@@ -236,6 +236,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.PeerInfo) { func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.PeerInfo) {
defer close(peerOut) defer close(peerOut)
defer log.Event(ctx, "findProviders end", &key)
log.Debugf("%s FindProviders %s", dht.self, key) log.Debugf("%s FindProviders %s", dht.self, key)
ps := pset.NewLimited(count) ps := pset.NewLimited(count)
...@@ -294,40 +295,11 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co ...@@ -294,40 +295,11 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
} }
} }
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *pset.PeerSet, count int, out chan peer.PeerInfo) {
var wg sync.WaitGroup
peerInfos := pb.PBPeersToPeerInfos(peers)
for _, pi := range peerInfos {
wg.Add(1)
go func(pi peer.PeerInfo) {
defer wg.Done()
p := pi.ID
if err := dht.ensureConnectedToPeer(ctx, p); err != nil {
log.Errorf("%s", err)
return
}
dht.providers.AddProvider(k, p)
if ps.TryAdd(p) {
select {
case out <- pi:
case <-ctx.Done():
return
}
} else if ps.Size() >= count {
return
}
}(pi)
}
wg.Wait()
}
// FindPeer searches for a peer with given ID. // FindPeer searches for a peer with given ID.
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) { func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) {
// Check if were already connected to them // Check if were already connected to them
if pi, _ := dht.FindLocal(id); pi.ID != "" { if pi := dht.FindLocal(id); pi.ID != "" {
return pi, nil return pi, nil
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment