Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-p2p-kad-dht
Commits
7a80399a
Commit
7a80399a
authored
Oct 06, 2020
by
Adin Schmahmann
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
refactor: move logging helpers to internal package
parent
7b5446ab
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
22 additions
and
18 deletions
+22
-18
dht.go
dht.go
+5
-4
handlers.go
handlers.go
+5
-4
internal/logging.go
internal/logging.go
+1
-1
internal/logging_test.go
internal/logging_test.go
+1
-1
messages.go
messages.go
+2
-1
routing.go
routing.go
+8
-7
No files found.
dht.go
View file @
7a80399a
...
@@ -16,6 +16,7 @@ import (
...
@@ -16,6 +16,7 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/libp2p/go-libp2p-kad-dht/internal"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb
"github.com/libp2p/go-libp2p-kad-dht/pb"
pb
"github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p-kad-dht/providers"
...
@@ -540,17 +541,17 @@ var errInvalidRecord = errors.New("received invalid record")
...
@@ -540,17 +541,17 @@ var errInvalidRecord = errors.New("received invalid record")
// getLocal attempts to retrieve the value from the datastore
// getLocal attempts to retrieve the value from the datastore
func
(
dht
*
IpfsDHT
)
getLocal
(
key
string
)
(
*
recpb
.
Record
,
error
)
{
func
(
dht
*
IpfsDHT
)
getLocal
(
key
string
)
(
*
recpb
.
Record
,
error
)
{
logger
.
Debugw
(
"finding value in datastore"
,
"key"
,
LoggableRecordKeyString
(
key
))
logger
.
Debugw
(
"finding value in datastore"
,
"key"
,
internal
.
LoggableRecordKeyString
(
key
))
rec
,
err
:=
dht
.
getRecordFromDatastore
(
mkDsKey
(
key
))
rec
,
err
:=
dht
.
getRecordFromDatastore
(
mkDsKey
(
key
))
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Warnw
(
"get local failed"
,
"key"
,
LoggableRecordKeyString
(
key
),
"error"
,
err
)
logger
.
Warnw
(
"get local failed"
,
"key"
,
internal
.
LoggableRecordKeyString
(
key
),
"error"
,
err
)
return
nil
,
err
return
nil
,
err
}
}
// Double check the key. Can't hurt.
// Double check the key. Can't hurt.
if
rec
!=
nil
&&
string
(
rec
.
GetKey
())
!=
key
{
if
rec
!=
nil
&&
string
(
rec
.
GetKey
())
!=
key
{
logger
.
Errorw
(
"BUG: found a DHT record that didn't match it's key"
,
"expected"
,
LoggableRecordKeyString
(
key
),
"got"
,
rec
.
GetKey
())
logger
.
Errorw
(
"BUG: found a DHT record that didn't match it's key"
,
"expected"
,
internal
.
LoggableRecordKeyString
(
key
),
"got"
,
rec
.
GetKey
())
return
nil
,
nil
return
nil
,
nil
}
}
...
@@ -561,7 +562,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
...
@@ -561,7 +562,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
func
(
dht
*
IpfsDHT
)
putLocal
(
key
string
,
rec
*
recpb
.
Record
)
error
{
func
(
dht
*
IpfsDHT
)
putLocal
(
key
string
,
rec
*
recpb
.
Record
)
error
{
data
,
err
:=
proto
.
Marshal
(
rec
)
data
,
err
:=
proto
.
Marshal
(
rec
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Warnw
(
"failed to put marshal record for local put"
,
"error"
,
err
,
"key"
,
LoggableRecordKeyString
(
key
))
logger
.
Warnw
(
"failed to put marshal record for local put"
,
"error"
,
err
,
"key"
,
internal
.
LoggableRecordKeyString
(
key
))
return
err
return
err
}
}
...
...
handlers.go
View file @
7a80399a
...
@@ -14,6 +14,7 @@ import (
...
@@ -14,6 +14,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/proto"
ds
"github.com/ipfs/go-datastore"
ds
"github.com/ipfs/go-datastore"
u
"github.com/ipfs/go-ipfs-util"
u
"github.com/ipfs/go-ipfs-util"
"github.com/libp2p/go-libp2p-kad-dht/internal"
pb
"github.com/libp2p/go-libp2p-kad-dht/pb"
pb
"github.com/libp2p/go-libp2p-kad-dht/pb"
recpb
"github.com/libp2p/go-libp2p-record/pb"
recpb
"github.com/libp2p/go-libp2p-record/pb"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-base32"
...
@@ -167,7 +168,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
...
@@ -167,7 +168,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
// Make sure the record is valid (not expired, valid signature etc)
// Make sure the record is valid (not expired, valid signature etc)
if
err
=
dht
.
Validator
.
Validate
(
string
(
rec
.
GetKey
()),
rec
.
GetValue
());
err
!=
nil
{
if
err
=
dht
.
Validator
.
Validate
(
string
(
rec
.
GetKey
()),
rec
.
GetValue
());
err
!=
nil
{
logger
.
Infow
(
"bad dht record in PUT"
,
"from"
,
p
,
"key"
,
LoggableRecordKeyBytes
(
rec
.
GetKey
()),
"error"
,
err
)
logger
.
Infow
(
"bad dht record in PUT"
,
"from"
,
p
,
"key"
,
internal
.
LoggableRecordKeyBytes
(
rec
.
GetKey
()),
"error"
,
err
)
return
nil
,
err
return
nil
,
err
}
}
...
@@ -196,11 +197,11 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
...
@@ -196,11 +197,11 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
recs
:=
[][]
byte
{
rec
.
GetValue
(),
existing
.
GetValue
()}
recs
:=
[][]
byte
{
rec
.
GetValue
(),
existing
.
GetValue
()}
i
,
err
:=
dht
.
Validator
.
Select
(
string
(
rec
.
GetKey
()),
recs
)
i
,
err
:=
dht
.
Validator
.
Select
(
string
(
rec
.
GetKey
()),
recs
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Warnw
(
"dht record passed validation but failed select"
,
"from"
,
p
,
"key"
,
LoggableRecordKeyBytes
(
rec
.
GetKey
()),
"error"
,
err
)
logger
.
Warnw
(
"dht record passed validation but failed select"
,
"from"
,
p
,
"key"
,
internal
.
LoggableRecordKeyBytes
(
rec
.
GetKey
()),
"error"
,
err
)
return
nil
,
err
return
nil
,
err
}
}
if
i
!=
0
{
if
i
!=
0
{
logger
.
Infow
(
"DHT record in PUT older than existing record (ignoring)"
,
"peer"
,
p
,
"key"
,
LoggableRecordKeyBytes
(
rec
.
GetKey
()))
logger
.
Infow
(
"DHT record in PUT older than existing record (ignoring)"
,
"peer"
,
p
,
"key"
,
internal
.
LoggableRecordKeyBytes
(
rec
.
GetKey
()))
return
nil
,
errors
.
New
(
"old record"
)
return
nil
,
errors
.
New
(
"old record"
)
}
}
}
}
...
@@ -344,7 +345,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
...
@@ -344,7 +345,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
return
nil
,
fmt
.
Errorf
(
"handleAddProvider key is empty"
)
return
nil
,
fmt
.
Errorf
(
"handleAddProvider key is empty"
)
}
}
logger
.
Debugf
(
"adding provider"
,
"from"
,
p
,
"key"
,
LoggableProviderRecordBytes
(
key
))
logger
.
Debugf
(
"adding provider"
,
"from"
,
p
,
"key"
,
internal
.
LoggableProviderRecordBytes
(
key
))
// add provider should use the address given in the message
// add provider should use the address given in the message
pinfos
:=
pb
.
PBPeersToPeerInfos
(
pmes
.
GetProviderPeers
())
pinfos
:=
pb
.
PBPeersToPeerInfos
(
pmes
.
GetProviderPeers
())
...
...
logging.go
→
internal/
logging.go
View file @
7a80399a
package
dht
package
internal
import
(
import
(
"fmt"
"fmt"
...
...
logging_test.go
→
internal/
logging_test.go
View file @
7a80399a
package
dht
package
internal
import
(
import
(
"testing"
"testing"
...
...
messages.go
View file @
7a80399a
...
@@ -14,6 +14,7 @@ import (
...
@@ -14,6 +14,7 @@ import (
recpb
"github.com/libp2p/go-libp2p-record/pb"
recpb
"github.com/libp2p/go-libp2p-record/pb"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-multihash"
"github.com/libp2p/go-libp2p-kad-dht/internal"
pb
"github.com/libp2p/go-libp2p-kad-dht/pb"
pb
"github.com/libp2p/go-libp2p-kad-dht/pb"
)
)
...
@@ -67,7 +68,7 @@ func (pm *ProtocolMessenger) PutValue(ctx context.Context, p peer.ID, rec *recpb
...
@@ -67,7 +68,7 @@ func (pm *ProtocolMessenger) PutValue(ctx context.Context, p peer.ID, rec *recpb
pmes
.
Record
=
rec
pmes
.
Record
=
rec
rpmes
,
err
:=
pm
.
m
.
SendRequest
(
ctx
,
p
,
pmes
)
rpmes
,
err
:=
pm
.
m
.
SendRequest
(
ctx
,
p
,
pmes
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Debugw
(
"failed to put value to peer"
,
"to"
,
p
,
"key"
,
LoggableRecordKeyBytes
(
rec
.
Key
),
"error"
,
err
)
logger
.
Debugw
(
"failed to put value to peer"
,
"to"
,
p
,
"key"
,
internal
.
LoggableRecordKeyBytes
(
rec
.
Key
),
"error"
,
err
)
return
err
return
err
}
}
...
...
routing.go
View file @
7a80399a
...
@@ -14,6 +14,7 @@ import (
...
@@ -14,6 +14,7 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cid"
u
"github.com/ipfs/go-ipfs-util"
u
"github.com/ipfs/go-ipfs-util"
"github.com/libp2p/go-libp2p-kad-dht/internal"
"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
kb
"github.com/libp2p/go-libp2p-kbucket"
kb
"github.com/libp2p/go-libp2p-kbucket"
record
"github.com/libp2p/go-libp2p-record"
record
"github.com/libp2p/go-libp2p-record"
...
@@ -31,7 +32,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
...
@@ -31,7 +32,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
return
routing
.
ErrNotSupported
return
routing
.
ErrNotSupported
}
}
logger
.
Debugw
(
"putting value"
,
"key"
,
LoggableRecordKeyString
(
key
))
logger
.
Debugw
(
"putting value"
,
"key"
,
internal
.
LoggableRecordKeyString
(
key
))
// don't even allow local users to put bad values.
// don't even allow local users to put bad values.
if
err
:=
dht
.
Validator
.
Validate
(
key
,
value
);
err
!=
nil
{
if
err
:=
dht
.
Validator
.
Validate
(
key
,
value
);
err
!=
nil
{
...
@@ -127,7 +128,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op
...
@@ -127,7 +128,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op
if
best
==
nil
{
if
best
==
nil
{
return
nil
,
routing
.
ErrNotFound
return
nil
,
routing
.
ErrNotFound
}
}
logger
.
Debugf
(
"GetValue %v %x"
,
LoggableRecordKeyString
(
key
),
best
)
logger
.
Debugf
(
"GetValue %v %x"
,
internal
.
LoggableRecordKeyString
(
key
),
best
)
return
best
,
nil
return
best
,
nil
}
}
...
@@ -246,7 +247,7 @@ loop:
...
@@ -246,7 +247,7 @@ loop:
}
}
sel
,
err
:=
dht
.
Validator
.
Select
(
key
,
[][]
byte
{
best
,
v
.
Val
})
sel
,
err
:=
dht
.
Validator
.
Select
(
key
,
[][]
byte
{
best
,
v
.
Val
})
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Warnw
(
"failed to select best value"
,
"key"
,
LoggableRecordKeyString
(
key
),
"error"
,
err
)
logger
.
Warnw
(
"failed to select best value"
,
"key"
,
internal
.
LoggableRecordKeyString
(
key
),
"error"
,
err
)
continue
continue
}
}
if
sel
!=
1
{
if
sel
!=
1
{
...
@@ -292,7 +293,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
...
@@ -292,7 +293,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
valCh
:=
make
(
chan
RecvdVal
,
1
)
valCh
:=
make
(
chan
RecvdVal
,
1
)
lookupResCh
:=
make
(
chan
*
lookupWithFollowupResult
,
1
)
lookupResCh
:=
make
(
chan
*
lookupWithFollowupResult
,
1
)
logger
.
Debugw
(
"finding value"
,
"key"
,
LoggableRecordKeyString
(
key
))
logger
.
Debugw
(
"finding value"
,
"key"
,
internal
.
LoggableRecordKeyString
(
key
))
if
rec
,
err
:=
dht
.
getLocal
(
key
);
rec
!=
nil
&&
err
==
nil
{
if
rec
,
err
:=
dht
.
getLocal
(
key
);
rec
!=
nil
&&
err
==
nil
{
select
{
select
{
...
@@ -398,7 +399,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
...
@@ -398,7 +399,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
return
fmt
.
Errorf
(
"invalid cid: undefined"
)
return
fmt
.
Errorf
(
"invalid cid: undefined"
)
}
}
keyMH
:=
key
.
Hash
()
keyMH
:=
key
.
Hash
()
logger
.
Debugw
(
"providing"
,
"cid"
,
key
,
"mh"
,
LoggableProviderRecordBytes
(
keyMH
))
logger
.
Debugw
(
"providing"
,
"cid"
,
key
,
"mh"
,
internal
.
LoggableProviderRecordBytes
(
keyMH
))
// add self locally
// add self locally
dht
.
ProviderManager
.
AddProvider
(
ctx
,
keyMH
,
dht
.
self
)
dht
.
ProviderManager
.
AddProvider
(
ctx
,
keyMH
,
dht
.
self
)
...
@@ -448,7 +449,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
...
@@ -448,7 +449,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
wg
.
Add
(
1
)
wg
.
Add
(
1
)
go
func
(
p
peer
.
ID
)
{
go
func
(
p
peer
.
ID
)
{
defer
wg
.
Done
()
defer
wg
.
Done
()
logger
.
Debugf
(
"putProvider(%s, %s)"
,
LoggableProviderRecordBytes
(
keyMH
),
p
)
logger
.
Debugf
(
"putProvider(%s, %s)"
,
internal
.
LoggableProviderRecordBytes
(
keyMH
),
p
)
err
:=
dht
.
protoMessenger
.
PutProvider
(
ctx
,
p
,
keyMH
,
dht
.
host
)
err
:=
dht
.
protoMessenger
.
PutProvider
(
ctx
,
p
,
keyMH
,
dht
.
host
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Debug
(
err
)
logger
.
Debug
(
err
)
...
@@ -497,7 +498,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i
...
@@ -497,7 +498,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i
keyMH
:=
key
.
Hash
()
keyMH
:=
key
.
Hash
()
logger
.
Debugw
(
"finding providers"
,
"cid"
,
key
,
"mh"
,
LoggableProviderRecordBytes
(
keyMH
))
logger
.
Debugw
(
"finding providers"
,
"cid"
,
key
,
"mh"
,
internal
.
LoggableProviderRecordBytes
(
keyMH
))
go
dht
.
findProvidersAsyncRoutine
(
ctx
,
keyMH
,
count
,
peerOut
)
go
dht
.
findProvidersAsyncRoutine
(
ctx
,
keyMH
,
count
,
peerOut
)
return
peerOut
return
peerOut
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment