Commit 1071ec3e authored by Adin Schmahmann's avatar Adin Schmahmann Committed by Petar Maymounkov

feat: refactor key logging

parent fc3558cc
...@@ -469,7 +469,7 @@ func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Re ...@@ -469,7 +469,7 @@ func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Re
pmes.Record = rec pmes.Record = rec
rpmes, err := dht.sendRequest(ctx, p, pmes) rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil { if err != nil {
logger.Debugw("failed to put value to peer", "to", p, "key", loggableKeyBytes(rec.Key), "error", err) logger.Debugw("failed to put value to peer", "to", p, "key", loggableRecordKeyBytes(rec.Key), "error", err)
return err return err
} }
...@@ -526,17 +526,17 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) ( ...@@ -526,17 +526,17 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (
// getLocal attempts to retrieve the value from the datastore // getLocal attempts to retrieve the value from the datastore
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) { func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
logger.Debugw("finding value in datastore", "key", loggableKeyString(key)) logger.Debugw("finding value in datastore", "key", loggableRecordKeyString(key))
rec, err := dht.getRecordFromDatastore(mkDsKey(key)) rec, err := dht.getRecordFromDatastore(mkDsKey(key))
if err != nil { if err != nil {
logger.Warnw("get local failed", "key", key, "error", err) logger.Warnw("get local failed", "key", loggableRecordKeyString(key), "error", err)
return nil, err return nil, err
} }
// Double check the key. Can't hurt. // Double check the key. Can't hurt.
if rec != nil && string(rec.GetKey()) != key { if rec != nil && string(rec.GetKey()) != key {
logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", key, "got", rec.GetKey()) logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", loggableRecordKeyString(key), "got", rec.GetKey())
return nil, nil return nil, nil
} }
...@@ -547,7 +547,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) { ...@@ -547,7 +547,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error { func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
data, err := proto.Marshal(rec) data, err := proto.Marshal(rec)
if err != nil { if err != nil {
logger.Warnw("failed to put marshal record for local put", "error", err, "key", key) logger.Warnw("failed to put marshal record for local put", "error", err, "key", loggableRecordKeyString(key))
return err return err
} }
......
...@@ -167,7 +167,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess ...@@ -167,7 +167,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
// Make sure the record is valid (not expired, valid signature etc) // Make sure the record is valid (not expired, valid signature etc)
if err = dht.Validator.Validate(string(rec.GetKey()), rec.GetValue()); err != nil { if err = dht.Validator.Validate(string(rec.GetKey()), rec.GetValue()); err != nil {
logger.Infow("bad dht record in PUT", "from", p, "key", rec.GetKey(), "error", err) logger.Infow("bad dht record in PUT", "from", p, "key", loggableRecordKeyBytes(rec.GetKey()), "error", err)
return nil, err return nil, err
} }
...@@ -196,11 +196,11 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess ...@@ -196,11 +196,11 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
recs := [][]byte{rec.GetValue(), existing.GetValue()} recs := [][]byte{rec.GetValue(), existing.GetValue()}
i, err := dht.Validator.Select(string(rec.GetKey()), recs) i, err := dht.Validator.Select(string(rec.GetKey()), recs)
if err != nil { if err != nil {
logger.Warnw("dht record passed validation but failed select", "from", p, "key", rec.GetKey(), "error", err) logger.Warnw("dht record passed validation but failed select", "from", p, "key", loggableRecordKeyBytes(rec.GetKey()), "error", err)
return nil, err return nil, err
} }
if i != 0 { if i != 0 {
logger.Infow("DHT record in PUT older than existing record (ignoring)", "peer", p, "key", rec.GetKey()) logger.Infow("DHT record in PUT older than existing record (ignoring)", "peer", p, "key", loggableRecordKeyBytes(rec.GetKey()))
return nil, errors.New("old record") return nil, errors.New("old record")
} }
} }
...@@ -344,7 +344,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M ...@@ -344,7 +344,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
return nil, fmt.Errorf("handleAddProvider key is empty") return nil, fmt.Errorf("handleAddProvider key is empty")
} }
logger.Debugf("adding provider", "from", p, "key", key) logger.Debugf("adding provider", "from", p, "key", loggableProviderRecordBytes(key))
// add provider should use the address given in the message // add provider should use the address given in the message
pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers()) pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
......
package dht
import (
"fmt"
"github.com/multiformats/go-multibase"
"github.com/multiformats/go-multihash"
"strings"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-base32"
)
func tryFormatLoggableRecordKey(k string) (string, error) {
if len(k) == 0 {
return "", fmt.Errorf("loggableRecordKey is empty")
}
var proto, cstr string
if k[0] == '/' {
// it's a path (probably)
protoEnd := strings.IndexByte(k[1:], '/')
if protoEnd < 0 {
return "", fmt.Errorf("loggableRecordKey starts with '/' but is not a path: %s", base32.RawStdEncoding.EncodeToString([]byte(k)))
}
proto = k[1 : protoEnd+1]
cstr = k[protoEnd+2:]
var encStr string
c, err := cid.Cast([]byte(cstr))
if err == nil {
encStr = c.String()
} else {
encStr = base32.RawStdEncoding.EncodeToString([]byte(cstr))
}
return fmt.Sprintf("/%s/%s", proto, encStr), nil
}
return "", fmt.Errorf("loggableRecordKey is not a path: %s", base32.RawStdEncoding.EncodeToString([]byte(k)))
}
type loggableRecordKeyString string
func (lk loggableRecordKeyString) String() string {
k := string(lk)
newKey, err := tryFormatLoggableRecordKey(k)
if err == nil {
return newKey
}
return err.Error()
}
type loggableRecordKeyBytes []byte
func (lk loggableRecordKeyBytes) String() string {
k := string(lk)
newKey, err := tryFormatLoggableRecordKey(k)
if err == nil {
return newKey
}
return err.Error()
}
type loggableProviderRecordBytes []byte
func (lk loggableProviderRecordBytes) String() string {
k := string(lk)
newKey, err := tryFormatLoggableProviderKey(k)
if err == nil {
return newKey
}
return err.Error()
}
func tryFormatLoggableProviderKey(k string) (string, error) {
if len(k) == 0 {
return "", fmt.Errorf("loggableProviderKey is empty")
}
h, err := multihash.Cast([]byte(k))
if err == nil {
c := cid.NewCidV1(cid.Raw, h)
encStr, err := c.StringOfBase(multibase.Base32)
if err != nil {
panic(fmt.Errorf("should be impossible to reach here : %w", err))
}
return encStr, nil
}
// The DHT used to provide CIDs, but now provides multihashes
// TODO: Drop this when enough of the network has upgraded
c, err := cid.Cast([]byte(k))
if err == nil {
encStr, err := c.StringOfBase(multibase.Base32)
if err != nil {
panic(fmt.Errorf("should be impossible to reach here : %w", err))
}
return encStr, nil
}
return "", fmt.Errorf("invalid provider record: %s : err %w", base32.RawStdEncoding.EncodeToString([]byte(k)), err)
}
...@@ -3,69 +3,15 @@ package dht ...@@ -3,69 +3,15 @@ package dht
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing" "github.com/libp2p/go-libp2p-core/routing"
"github.com/ipfs/go-cid"
pb "github.com/libp2p/go-libp2p-kad-dht/pb" pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kb "github.com/libp2p/go-libp2p-kbucket" kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/multiformats/go-base32"
) )
func tryFormatLoggableKey(k string) (string, error) {
if len(k) == 0 {
return "", fmt.Errorf("loggableKey is empty")
}
var proto, cstr string
if k[0] == '/' {
// it's a path (probably)
protoEnd := strings.IndexByte(k[1:], '/')
if protoEnd < 0 {
return k, fmt.Errorf("loggableKey starts with '/' but is not a path: %x", k)
}
proto = k[1 : protoEnd+1]
cstr = k[protoEnd+2:]
} else {
proto = "provider"
cstr = k
}
var encStr string
c, err := cid.Cast([]byte(cstr))
if err == nil {
encStr = c.String()
} else {
encStr = base32.RawStdEncoding.EncodeToString([]byte(cstr))
}
return fmt.Sprintf("/%s/%s", proto, encStr), nil
}
type loggableKeyBytes []byte
func (lk loggableKeyString) String() string {
k := string(lk)
newKey, err := tryFormatLoggableKey(k)
if err == nil {
return newKey
}
return k
}
type loggableKeyString string
func (lk loggableKeyBytes) String() string {
k := string(lk)
newKey, err := tryFormatLoggableKey(k)
if err == nil {
return newKey
}
return k
}
// GetClosestPeers is a Kademlia 'node lookup' operation. Returns a channel of // GetClosestPeers is a Kademlia 'node lookup' operation. Returns a channel of
// the K closest peers to the given key. // the K closest peers to the given key.
// //
......
...@@ -6,37 +6,72 @@ import ( ...@@ -6,37 +6,72 @@ import (
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
) )
func TestLoggableKey(t *testing.T) { func TestLoggableRecordKey(t *testing.T) {
c, err := cid.Decode("QmfUvYQhL2GinafMbPDYz7VFoZv4iiuLuR33aRsPurXGag") c, err := cid.Decode("QmfUvYQhL2GinafMbPDYz7VFoZv4iiuLuR33aRsPurXGag")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
k, err := tryFormatLoggableKey("/proto/" + string(c.Bytes())) k, err := tryFormatLoggableRecordKey("/proto/" + string(c.Bytes()))
if err != nil { if err != nil {
t.Errorf("failed to format key 1: %s", err) t.Errorf("failed to format key: %s", err)
} }
if k != "/proto/"+c.String() { if k != "/proto/"+c.String() {
t.Error("expected path to be preserved as a loggable key") t.Error("expected path to be preserved as a loggable key")
} }
k, err = tryFormatLoggableKey(string(c.Bytes())) for _, s := range []string{"/bla", "", "bla bla"} {
if _, err := tryFormatLoggableRecordKey(s); err == nil {
t.Errorf("expected to fail formatting: %s", s)
}
}
for _, s := range []string{"/bla/asdf", "/a/b/c"} {
if _, err := tryFormatLoggableRecordKey(s); err != nil {
t.Errorf("expected to be formatable: %s", s)
}
}
}
func TestLoggableProviderKey(t *testing.T) {
c0, err := cid.Decode("QmfUvYQhL2GinafMbPDYz7VFoZv4iiuLuR33aRsPurXGag")
if err != nil { if err != nil {
t.Errorf("failed to format key 2: %s", err) t.Fatal(err)
} }
if k != "/provider/"+c.String() {
t.Error("expected cid to be formatted as a loggable key") // Test logging CIDv0 provider
c0ascidv1Raw := cid.NewCidV1(cid.Raw, c0.Hash())
k, err := tryFormatLoggableProviderKey(string(c0.Bytes()))
if err != nil {
t.Errorf("failed to format key: %s", err)
}
if k != c0ascidv1Raw.String() {
t.Error("expected cidv0 to be converted into CIDv1 b32 with Raw codec")
} }
for _, s := range []string{"/bla", ""} { // Test logging CIDv1 provider (from older DHT implementations)
if _, err := tryFormatLoggableKey(s); err == nil { c1 := cid.NewCidV1(cid.DagProtobuf, c0.Hash())
t.Errorf("expected to fail formatting: %s", s) k, err = tryFormatLoggableProviderKey(string(c1.Bytes()))
if err != nil {
t.Errorf("failed to format key: %s", err)
} }
if k != c1.String() {
t.Error("expected cidv1 to be displayed normally")
} }
for _, s := range []string{"bla bla", "/bla/asdf"} { // Test logging multihash provider
if _, err := tryFormatLoggableKey(s); err != nil { c1ascidv1Raw := cid.NewCidV1(cid.Raw, c1.Hash())
t.Errorf("expected to be formatable: %s", s) k, err = tryFormatLoggableProviderKey(string(c1.Hash()))
if err != nil {
t.Errorf("failed to format key: %s", err)
}
if k != c1ascidv1Raw.String() {
t.Error("expected multihash to be converted into CIDv1 b32 with Raw codec")
}
for _, s := range []string{"/bla", "", "bla bla", "/bla/asdf", "/a/b/c"} {
if _, err := tryFormatLoggableProviderKey(s); err == nil {
t.Errorf("expected to fail formatting: %s", s)
} }
} }
} }
...@@ -32,7 +32,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...@@ -32,7 +32,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
return routing.ErrNotSupported return routing.ErrNotSupported
} }
logger.Debugw("putting value", "key", loggableKeyString(key)) logger.Debugw("putting value", "key", loggableRecordKeyString(key))
// don't even allow local users to put bad values. // don't even allow local users to put bad values.
if err := dht.Validator.Validate(key, value); err != nil { if err := dht.Validator.Validate(key, value); err != nil {
...@@ -128,7 +128,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op ...@@ -128,7 +128,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op
if best == nil { if best == nil {
return nil, routing.ErrNotFound return nil, routing.ErrNotFound
} }
logger.Debugf("GetValue %v %v", key, best) logger.Debugf("GetValue %v %x", loggableRecordKeyString(key), best)
return best, nil return best, nil
} }
...@@ -247,7 +247,7 @@ loop: ...@@ -247,7 +247,7 @@ loop:
} }
sel, err := dht.Validator.Select(key, [][]byte{best, v.Val}) sel, err := dht.Validator.Select(key, [][]byte{best, v.Val})
if err != nil { if err != nil {
logger.Warnw("failed to select best value", "key", key, "error", err) logger.Warnw("failed to select best value", "key", loggableRecordKeyString(key), "error", err)
continue continue
} }
if sel != 1 { if sel != 1 {
...@@ -293,7 +293,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st ...@@ -293,7 +293,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
valCh := make(chan RecvdVal, 1) valCh := make(chan RecvdVal, 1)
lookupResCh := make(chan *lookupWithFollowupResult, 1) lookupResCh := make(chan *lookupWithFollowupResult, 1)
logger.Debugw("finding value", "key", loggableKeyString(key)) logger.Debugw("finding value", "key", loggableRecordKeyString(key))
if rec, err := dht.getLocal(key); rec != nil && err == nil { if rec, err := dht.getLocal(key); rec != nil && err == nil {
select { select {
...@@ -398,9 +398,8 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err ...@@ -398,9 +398,8 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
} else if !key.Defined() { } else if !key.Defined() {
return fmt.Errorf("invalid cid: undefined") return fmt.Errorf("invalid cid: undefined")
} }
logger.Debugw("finding provider", "cid", key)
keyMH := key.Hash() keyMH := key.Hash()
logger.Debugw("providing", "cid", key, "mh", loggableProviderRecordBytes(keyMH))
// add self locally // add self locally
dht.ProviderManager.AddProvider(ctx, keyMH, dht.self) dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
...@@ -455,7 +454,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err ...@@ -455,7 +454,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
wg.Add(1) wg.Add(1)
go func(p peer.ID) { go func(p peer.ID) {
defer wg.Done() defer wg.Done()
logger.Debugf("putProvider(%s, %s)", keyMH, p) logger.Debugf("putProvider(%s, %s)", loggableProviderRecordBytes(keyMH), p)
err := dht.sendMessage(ctx, p, mes) err := dht.sendMessage(ctx, p, mes)
if err != nil { if err != nil {
logger.Debug(err) logger.Debug(err)
...@@ -520,13 +519,12 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i ...@@ -520,13 +519,12 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i
keyMH := key.Hash() keyMH := key.Hash()
logger.Debugw("finding providers", "cid", key, "mh", loggableProviderRecordBytes(keyMH))
go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut) go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
return peerOut return peerOut
} }
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
logger.Debugw("finding providers", "key", key)
defer close(peerOut) defer close(peerOut)
findAll := count == 0 findAll := count == 0
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/multiformats/go-base32"
) )
var logger = logging.Logger("dht/RtRefreshManager") var logger = logging.Logger("dht/RtRefreshManager")
...@@ -263,7 +264,7 @@ func (r *RtRefreshManager) refreshCpl(cpl uint) error { ...@@ -263,7 +264,7 @@ func (r *RtRefreshManager) refreshCpl(cpl uint) error {
} }
logger.Infof("starting refreshing cpl %d with key %s (routing table size was %d)", logger.Infof("starting refreshing cpl %d with key %s (routing table size was %d)",
cpl, key, r.rt.Size()) cpl, loggableRawKeyString(key), r.rt.Size())
if err := r.runRefreshDHTQuery(key); err != nil { if err := r.runRefreshDHTQuery(key); err != nil {
return fmt.Errorf("failed to refresh cpl=%d, err=%s", cpl, err) return fmt.Errorf("failed to refresh cpl=%d, err=%s", cpl, err)
...@@ -292,3 +293,17 @@ func (r *RtRefreshManager) runRefreshDHTQuery(key string) error { ...@@ -292,3 +293,17 @@ func (r *RtRefreshManager) runRefreshDHTQuery(key string) error {
return err return err
} }
type loggableRawKeyString string
func (lk loggableRawKeyString) String() string {
k := string(lk)
if len(k) == 0 {
return k
}
encStr := base32.RawStdEncoding.EncodeToString([]byte(k))
return encStr
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment