Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-p2p-kad-dht
Commits
c1e316a7
Commit
c1e316a7
authored
Jan 22, 2015
by
Jeromy
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
really ugly impl of 'ipfs dht query' command
parent
2ab1cc56
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
158 additions
and
75 deletions
+158
-75
lookup.go
lookup.go
+156
-0
routing.go
routing.go
+2
-75
No files found.
lookup.go
0 → 100644
View file @
c1e316a7
package
dht
import
(
"encoding/json"
context
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
peer
"github.com/jbenet/go-ipfs/p2p/peer"
kb
"github.com/jbenet/go-ipfs/routing/kbucket"
u
"github.com/jbenet/go-ipfs/util"
errors
"github.com/jbenet/go-ipfs/util/debugerror"
pset
"github.com/jbenet/go-ipfs/util/peerset"
)
type
QueryEventType
int
const
(
SendingQuery
QueryEventType
=
iota
PeerResponse
FinalPeer
)
type
QueryEvent
struct
{
ID
peer
.
ID
Type
QueryEventType
Responses
[]
*
peer
.
PeerInfo
}
func
pointerizePeerInfos
(
pis
[]
peer
.
PeerInfo
)
[]
*
peer
.
PeerInfo
{
out
:=
make
([]
*
peer
.
PeerInfo
,
len
(
pis
))
for
i
,
p
:=
range
pis
{
np
:=
p
out
[
i
]
=
&
np
}
return
out
}
// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
func
(
dht
*
IpfsDHT
)
GetClosestPeers
(
ctx
context
.
Context
,
key
u
.
Key
,
events
chan
<-
*
QueryEvent
)
(
<-
chan
peer
.
ID
,
error
)
{
e
:=
log
.
EventBegin
(
ctx
,
"getClosestPeers"
,
&
key
)
tablepeers
:=
dht
.
routingTable
.
NearestPeers
(
kb
.
ConvertKey
(
key
),
AlphaValue
)
if
len
(
tablepeers
)
==
0
{
return
nil
,
errors
.
Wrap
(
kb
.
ErrLookupFailure
)
}
out
:=
make
(
chan
peer
.
ID
,
KValue
)
peerset
:=
pset
.
NewLimited
(
KValue
)
for
_
,
p
:=
range
tablepeers
{
select
{
case
out
<-
p
:
case
<-
ctx
.
Done
()
:
return
nil
,
ctx
.
Err
()
}
peerset
.
Add
(
p
)
}
query
:=
dht
.
newQuery
(
key
,
func
(
ctx
context
.
Context
,
p
peer
.
ID
)
(
*
dhtQueryResult
,
error
)
{
// For DHT query command
select
{
case
events
<-
&
QueryEvent
{
Type
:
SendingQuery
,
ID
:
p
,
}
:
}
closer
,
err
:=
dht
.
closerPeersSingle
(
ctx
,
key
,
p
)
if
err
!=
nil
{
log
.
Errorf
(
"error getting closer peers: %s"
,
err
)
return
nil
,
err
}
var
filtered
[]
peer
.
PeerInfo
for
_
,
clp
:=
range
closer
{
if
kb
.
Closer
(
clp
,
dht
.
self
,
key
)
&&
peerset
.
TryAdd
(
clp
)
{
select
{
case
out
<-
clp
:
log
.
Error
(
"Sending out peer: %s"
,
clp
.
Pretty
())
case
<-
ctx
.
Done
()
:
return
nil
,
ctx
.
Err
()
}
filtered
=
append
(
filtered
,
dht
.
peerstore
.
PeerInfo
(
clp
))
}
}
log
.
Errorf
(
"filtered: %v"
,
filtered
)
// For DHT query command
select
{
case
events
<-
&
QueryEvent
{
Type
:
PeerResponse
,
ID
:
p
,
Responses
:
pointerizePeerInfos
(
filtered
),
}
:
}
return
&
dhtQueryResult
{
closerPeers
:
filtered
},
nil
})
go
func
()
{
defer
close
(
out
)
defer
e
.
Done
()
// run it!
_
,
err
:=
query
.
Run
(
ctx
,
tablepeers
)
if
err
!=
nil
{
log
.
Debugf
(
"closestPeers query run error: %s"
,
err
)
}
}()
return
out
,
nil
}
func
(
dht
*
IpfsDHT
)
closerPeersSingle
(
ctx
context
.
Context
,
key
u
.
Key
,
p
peer
.
ID
)
([]
peer
.
ID
,
error
)
{
pmes
,
err
:=
dht
.
findPeerSingle
(
ctx
,
p
,
peer
.
ID
(
key
))
if
err
!=
nil
{
return
nil
,
err
}
var
out
[]
peer
.
ID
for
_
,
pbp
:=
range
pmes
.
GetCloserPeers
()
{
pid
:=
peer
.
ID
(
pbp
.
GetId
())
if
pid
!=
dht
.
self
{
// dont add self
dht
.
peerstore
.
AddAddresses
(
pid
,
pbp
.
Addresses
())
out
=
append
(
out
,
pid
)
}
}
return
out
,
nil
}
func
(
qe
*
QueryEvent
)
MarshalJSON
()
([]
byte
,
error
)
{
out
:=
make
(
map
[
string
]
interface
{})
out
[
"ID"
]
=
peer
.
IDB58Encode
(
qe
.
ID
)
out
[
"Type"
]
=
int
(
qe
.
Type
)
out
[
"Responses"
]
=
qe
.
Responses
return
json
.
Marshal
(
out
)
}
func
(
qe
*
QueryEvent
)
UnmarshalJSON
(
b
[]
byte
)
error
{
temp
:=
struct
{
ID
string
Type
int
Responses
[]
*
peer
.
PeerInfo
}{}
err
:=
json
.
Unmarshal
(
b
,
&
temp
)
if
err
!=
nil
{
return
err
}
pid
,
err
:=
peer
.
IDB58Decode
(
temp
.
ID
)
if
err
!=
nil
{
return
err
}
qe
.
ID
=
pid
qe
.
Type
=
QueryEventType
(
temp
.
Type
)
qe
.
Responses
=
temp
.
Responses
return
nil
}
routing.go
View file @
c1e316a7
...
...
@@ -48,7 +48,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
return
err
}
pchan
,
err
:=
dht
.
g
etClosestPeers
(
ctx
,
key
)
pchan
,
err
:=
dht
.
G
etClosestPeers
(
ctx
,
key
,
nil
)
if
err
!=
nil
{
return
err
}
...
...
@@ -134,7 +134,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
// add self locally
dht
.
providers
.
AddProvider
(
key
,
dht
.
self
)
peers
,
err
:=
dht
.
g
etClosestPeers
(
ctx
,
key
)
peers
,
err
:=
dht
.
G
etClosestPeers
(
ctx
,
key
,
nil
)
if
err
!=
nil
{
return
err
}
...
...
@@ -164,79 +164,6 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn
return
providers
,
nil
}
// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
func
(
dht
*
IpfsDHT
)
getClosestPeers
(
ctx
context
.
Context
,
key
u
.
Key
)
(
<-
chan
peer
.
ID
,
error
)
{
e
:=
log
.
EventBegin
(
ctx
,
"getClosestPeers"
,
&
key
)
tablepeers
:=
dht
.
routingTable
.
ListPeers
()
if
len
(
tablepeers
)
==
0
{
return
nil
,
errors
.
Wrap
(
kb
.
ErrLookupFailure
)
}
out
:=
make
(
chan
peer
.
ID
,
KValue
)
peerset
:=
pset
.
NewLimited
(
KValue
)
for
_
,
p
:=
range
tablepeers
{
select
{
case
out
<-
p
:
case
<-
ctx
.
Done
()
:
return
nil
,
ctx
.
Err
()
}
peerset
.
Add
(
p
)
}
query
:=
dht
.
newQuery
(
key
,
func
(
ctx
context
.
Context
,
p
peer
.
ID
)
(
*
dhtQueryResult
,
error
)
{
closer
,
err
:=
dht
.
closerPeersSingle
(
ctx
,
key
,
p
)
if
err
!=
nil
{
log
.
Errorf
(
"error getting closer peers: %s"
,
err
)
return
nil
,
err
}
var
filtered
[]
peer
.
PeerInfo
for
_
,
p
:=
range
closer
{
if
kb
.
Closer
(
p
,
dht
.
self
,
key
)
&&
peerset
.
TryAdd
(
p
)
{
select
{
case
out
<-
p
:
case
<-
ctx
.
Done
()
:
return
nil
,
ctx
.
Err
()
}
filtered
=
append
(
filtered
,
dht
.
peerstore
.
PeerInfo
(
p
))
}
}
return
&
dhtQueryResult
{
closerPeers
:
filtered
},
nil
})
go
func
()
{
defer
close
(
out
)
defer
e
.
Done
()
// run it!
_
,
err
:=
query
.
Run
(
ctx
,
tablepeers
)
if
err
!=
nil
{
log
.
Debugf
(
"closestPeers query run error: %s"
,
err
)
}
}()
return
out
,
nil
}
func
(
dht
*
IpfsDHT
)
closerPeersSingle
(
ctx
context
.
Context
,
key
u
.
Key
,
p
peer
.
ID
)
([]
peer
.
ID
,
error
)
{
pmes
,
err
:=
dht
.
findPeerSingle
(
ctx
,
p
,
peer
.
ID
(
key
))
if
err
!=
nil
{
return
nil
,
err
}
var
out
[]
peer
.
ID
for
_
,
pbp
:=
range
pmes
.
GetCloserPeers
()
{
pid
:=
peer
.
ID
(
pbp
.
GetId
())
if
pid
!=
dht
.
self
{
// dont add self
dht
.
peerstore
.
AddAddresses
(
pid
,
pbp
.
Addresses
())
out
=
append
(
out
,
pid
)
}
}
return
out
,
nil
}
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment