Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-dms3-routing
Commits
aab538ff
Commit
aab538ff
authored
10 years ago
by
Juan Batiz-Benet
Committed by
Brian Tiger Chow
10 years ago
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Fixed connections all over.
parent
164f56cd
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
186 additions
and
187 deletions
+186
-187
dht/Message.go
dht/Message.go
+1
-1
dht/dht.go
dht/dht.go
+16
-6
dht/dht_test.go
dht/dht_test.go
+158
-172
dht/handlers.go
dht/handlers.go
+1
-0
dht/query.go
dht/query.go
+8
-8
dht/routing.go
dht/routing.go
+2
-0
No files found.
dht/Message.go
View file @
aab538ff
...
...
@@ -46,7 +46,7 @@ func peersToPBPeers(peers []*peer.Peer) []*Message_Peer {
func
(
m
*
Message
)
GetClusterLevel
()
int
{
level
:=
m
.
GetClusterLevelRaw
()
-
1
if
level
<
0
{
u
.
PErr
(
"
handleGetValue
: no routing level specified, assuming 0
\n
"
)
u
.
PErr
(
"
GetClusterLevel
: no routing level specified, assuming 0
\n
"
)
level
=
0
}
return
int
(
level
)
...
...
This diff is collapsed.
Click to expand it.
dht/dht.go
View file @
aab538ff
...
...
@@ -125,7 +125,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.
dht
.
Update
(
mPeer
)
// Print out diagnostic
u
.
DOut
(
"[peer: %s]
\n
Got message type: '%s' [from = %s]
\n
"
,
u
.
DOut
(
"[peer: %s]
Got message type: '%s' [from = %s]
\n
"
,
dht
.
self
.
ID
.
Pretty
(),
Message_MessageType_name
[
int32
(
pmes
.
GetType
())],
mPeer
.
ID
.
Pretty
())
...
...
@@ -141,6 +141,11 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.
return
nil
,
err
}
// if nil response, return it before serializing
if
rpmes
==
nil
{
return
nil
,
nil
}
// serialize response msg
rmes
,
err
:=
msg
.
FromObject
(
mPeer
,
rpmes
)
if
err
!=
nil
{
...
...
@@ -161,6 +166,11 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message
start
:=
time
.
Now
()
// Print out diagnostic
u
.
DOut
(
"[peer: %s] Sent message type: '%s' [to = %s]
\n
"
,
dht
.
self
.
ID
.
Pretty
(),
Message_MessageType_name
[
int32
(
pmes
.
GetType
())],
p
.
ID
.
Pretty
())
rmes
,
err
:=
dht
.
sender
.
SendRequest
(
ctx
,
mes
)
if
err
!=
nil
{
return
nil
,
err
...
...
@@ -209,10 +219,10 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
return
nil
,
nil
,
err
}
u
.
P
Out
(
"pmes.GetValue() %v
\n
"
,
pmes
.
GetValue
())
u
.
D
Out
(
"pmes.GetValue() %v
\n
"
,
pmes
.
GetValue
())
if
value
:=
pmes
.
GetValue
();
value
!=
nil
{
// Success! We were given the value
u
.
P
Out
(
"getValueOrPeers: got value
\n
"
)
u
.
D
Out
(
"getValueOrPeers: got value
\n
"
)
return
value
,
nil
,
nil
}
...
...
@@ -222,7 +232,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
if
err
!=
nil
{
return
nil
,
nil
,
err
}
u
.
P
Out
(
"getValueOrPeers: get from providers
\n
"
)
u
.
D
Out
(
"getValueOrPeers: get from providers
\n
"
)
return
val
,
nil
,
nil
}
...
...
@@ -250,11 +260,11 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
}
if
len
(
peers
)
>
0
{
u
.
P
Out
(
"getValueOrPeers: peers
\n
"
)
u
.
D
Out
(
"getValueOrPeers: peers
\n
"
)
return
nil
,
peers
,
nil
}
u
.
P
Out
(
"getValueOrPeers: u.ErrNotFound
\n
"
)
u
.
D
Out
(
"getValueOrPeers: u.ErrNotFound
\n
"
)
return
nil
,
nil
,
u
.
ErrNotFound
}
...
...
This diff is collapsed.
Click to expand it.
dht/dht_test.go
View file @
aab538ff
package
dht
// import (
// "testing"
//
// context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
//
// ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
// ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
//
// ci "github.com/jbenet/go-ipfs/crypto"
// spipe "github.com/jbenet/go-ipfs/crypto/spipe"
// inet "github.com/jbenet/go-ipfs/net"
// mux "github.com/jbenet/go-ipfs/net/mux"
// netservice "github.com/jbenet/go-ipfs/net/service"
// peer "github.com/jbenet/go-ipfs/peer"
// u "github.com/jbenet/go-ipfs/util"
//
// "bytes"
// "fmt"
// "time"
// )
//
// func setupDHT(t *testing.T, p *peer.Peer) *IpfsDHT {
// ctx := context.TODO()
//
// peerstore := peer.NewPeerstore()
//
// ctx, _ = context.WithCancel(ctx)
// dhts := netservice.NewService(nil) // nil handler for now, need to patch it
// if err := dhts.Start(ctx); err != nil {
// t.Fatal(err)
// }
//
// net, err := inet.NewIpfsNetwork(context.TODO(), p, &mux.ProtocolMap{
// mux.ProtocolID_Routing: dhts,
// })
// if err != nil {
// t.Fatal(err)
// }
//
// d := NewDHT(p, peerstore, net, dhts, ds.NewMapDatastore())
// dhts.Handler = d
// return d
// }
//
// func setupDHTS(n int, t *testing.T) ([]*ma.Multiaddr, []*peer.Peer, []*IpfsDHT) {
// var addrs []*ma.Multiaddr
// for i := 0; i < 4; i++ {
// a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000+i))
// if err != nil {
// t.Fatal(err)
// }
// addrs = append(addrs, a)
// }
//
// var peers []*peer.Peer
// for i := 0; i < 4; i++ {
// p := new(peer.Peer)
// p.AddAddress(addrs[i])
// sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
// if err != nil {
// panic(err)
// }
// p.PubKey = pk
// p.PrivKey = sk
// id, err := spipe.IDFromPubKey(pk)
// if err != nil {
// panic(err)
// }
// p.ID = id
// peers = append(peers, p)
// }
//
// var dhts []*IpfsDHT
// for i := 0; i < 4; i++ {
// dhts[i] = setupDHT(t, peers[i])
// }
//
// return addrs, peers, dhts
// }
//
// func makePeer(addr *ma.Multiaddr) *peer.Peer {
// p := new(peer.Peer)
// p.AddAddress(addr)
// sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
// if err != nil {
// panic(err)
// }
// p.PrivKey = sk
// p.PubKey = pk
// id, err := spipe.IDFromPubKey(pk)
// if err != nil {
// panic(err)
// }
//
// p.ID = id
// return p
// }
//
// func TestPing(t *testing.T) {
// u.Debug = true
// addrA, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/2222")
// if err != nil {
// t.Fatal(err)
// }
// addrB, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678")
// if err != nil {
// t.Fatal(err)
// }
//
// peerA := makePeer(addrA)
// peerB := makePeer(addrB)
//
// dhtA := setupDHT(t, peerA)
// dhtB := setupDHT(t, peerB)
//
// defer dhtA.Halt()
// defer dhtB.Halt()
//
// _, err = dhtA.Connect(addrB)
// if err != nil {
// t.Fatal(err)
// }
//
// //Test that we can ping the node
// err = dhtA.Ping(peerB, time.Second*2)
// if err != nil {
// t.Fatal(err)
// }
// }
//
// func TestValueGetSet(t *testing.T) {
// u.Debug = false
// addrA, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235")
// if err != nil {
// t.Fatal(err)
// }
// addrB, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5679")
// if err != nil {
// t.Fatal(err)
// }
//
// peerA := makePeer(addrA)
// peerB := makePeer(addrB)
//
// dhtA := setupDHT(t, peerA)
// dhtB := setupDHT(t, peerB)
//
// defer dhtA.Halt()
// defer dhtB.Halt()
//
// _, err = dhtA.Connect(addrB)
// if err != nil {
// t.Fatal(err)
// }
//
// dhtA.PutValue("hello", []byte("world"))
//
// val, err := dhtA.GetValue("hello", time.Second*2)
// if err != nil {
// t.Fatal(err)
// }
//
// if string(val) != "world" {
// t.Fatalf("Expected 'world' got '%s'", string(val))
// }
//
// }
//
import
(
"testing"
context
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
ma
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
ci
"github.com/jbenet/go-ipfs/crypto"
spipe
"github.com/jbenet/go-ipfs/crypto/spipe"
inet
"github.com/jbenet/go-ipfs/net"
mux
"github.com/jbenet/go-ipfs/net/mux"
netservice
"github.com/jbenet/go-ipfs/net/service"
peer
"github.com/jbenet/go-ipfs/peer"
u
"github.com/jbenet/go-ipfs/util"
"fmt"
"time"
)
func
setupDHT
(
t
*
testing
.
T
,
p
*
peer
.
Peer
)
*
IpfsDHT
{
ctx
,
_
:=
context
.
WithCancel
(
context
.
TODO
())
peerstore
:=
peer
.
NewPeerstore
()
dhts
:=
netservice
.
NewService
(
nil
)
// nil handler for now, need to patch it
if
err
:=
dhts
.
Start
(
ctx
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
net
,
err
:=
inet
.
NewIpfsNetwork
(
ctx
,
p
,
&
mux
.
ProtocolMap
{
mux
.
ProtocolID_Routing
:
dhts
,
})
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
d
:=
NewDHT
(
p
,
peerstore
,
net
,
dhts
,
ds
.
NewMapDatastore
())
dhts
.
Handler
=
d
return
d
}
func
setupDHTS
(
n
int
,
t
*
testing
.
T
)
([]
*
ma
.
Multiaddr
,
[]
*
peer
.
Peer
,
[]
*
IpfsDHT
)
{
var
addrs
[]
*
ma
.
Multiaddr
for
i
:=
0
;
i
<
4
;
i
++
{
a
,
err
:=
ma
.
NewMultiaddr
(
fmt
.
Sprintf
(
"/ip4/127.0.0.1/tcp/%d"
,
5000
+
i
))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
addrs
=
append
(
addrs
,
a
)
}
var
peers
[]
*
peer
.
Peer
for
i
:=
0
;
i
<
4
;
i
++
{
p
:=
makePeer
(
addrs
[
i
])
peers
=
append
(
peers
,
p
)
}
var
dhts
[]
*
IpfsDHT
for
i
:=
0
;
i
<
4
;
i
++
{
dhts
[
i
]
=
setupDHT
(
t
,
peers
[
i
])
}
return
addrs
,
peers
,
dhts
}
func
makePeer
(
addr
*
ma
.
Multiaddr
)
*
peer
.
Peer
{
p
:=
new
(
peer
.
Peer
)
p
.
AddAddress
(
addr
)
sk
,
pk
,
err
:=
ci
.
GenerateKeyPair
(
ci
.
RSA
,
512
)
if
err
!=
nil
{
panic
(
err
)
}
p
.
PrivKey
=
sk
p
.
PubKey
=
pk
id
,
err
:=
spipe
.
IDFromPubKey
(
pk
)
if
err
!=
nil
{
panic
(
err
)
}
p
.
ID
=
id
return
p
}
func
TestPing
(
t
*
testing
.
T
)
{
u
.
Debug
=
true
addrA
,
err
:=
ma
.
NewMultiaddr
(
"/ip4/127.0.0.1/tcp/2222"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
addrB
,
err
:=
ma
.
NewMultiaddr
(
"/ip4/127.0.0.1/tcp/5678"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
peerA
:=
makePeer
(
addrA
)
peerB
:=
makePeer
(
addrB
)
dhtA
:=
setupDHT
(
t
,
peerA
)
dhtB
:=
setupDHT
(
t
,
peerB
)
defer
dhtA
.
Halt
()
defer
dhtB
.
Halt
()
_
,
err
=
dhtA
.
Connect
(
peerB
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
//Test that we can ping the node
err
=
dhtA
.
Ping
(
peerB
,
time
.
Second
*
2
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
func
TestValueGetSet
(
t
*
testing
.
T
)
{
u
.
Debug
=
false
addrA
,
err
:=
ma
.
NewMultiaddr
(
"/ip4/127.0.0.1/tcp/1235"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
addrB
,
err
:=
ma
.
NewMultiaddr
(
"/ip4/127.0.0.1/tcp/5679"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
peerA
:=
makePeer
(
addrA
)
peerB
:=
makePeer
(
addrB
)
dhtA
:=
setupDHT
(
t
,
peerA
)
dhtB
:=
setupDHT
(
t
,
peerB
)
defer
dhtA
.
Halt
()
defer
dhtB
.
Halt
()
_
,
err
=
dhtA
.
Connect
(
peerB
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
dhtA
.
PutValue
(
"hello"
,
[]
byte
(
"world"
))
val
,
err
:=
dhtA
.
GetValue
(
"hello"
,
time
.
Second
*
2
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
string
(
val
)
!=
"world"
{
t
.
Fatalf
(
"Expected 'world' got '%s'"
,
string
(
val
))
}
}
// func TestProvides(t *testing.T) {
// u.Debug = false
//
//
addrs, _
, dhts := setupDHTS(4, t)
//
_, peers
, dhts := setupDHTS(4, t)
// defer func() {
// for i := 0; i < 4; i++ {
// dhts[i].Halt()
// }
// }()
//
// _, err := dhts[0].Connect(
add
rs[1])
// _, err := dhts[0].Connect(
pee
rs[1])
// if err != nil {
// t.Fatal(err)
// }
//
// _, err = dhts[1].Connect(
add
rs[2])
// _, err = dhts[1].Connect(
pee
rs[2])
// if err != nil {
// t.Fatal(err)
// }
//
// _, err = dhts[1].Connect(
add
rs[3])
// _, err = dhts[1].Connect(
pee
rs[3])
// if err != nil {
// t.Fatal(err)
// }
...
...
This diff is collapsed.
Click to expand it.
dht/handlers.go
View file @
aab538ff
...
...
@@ -97,6 +97,7 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error
defer
dht
.
dslock
.
Unlock
()
dskey
:=
ds
.
NewKey
(
pmes
.
GetKey
())
err
:=
dht
.
datastore
.
Put
(
dskey
,
pmes
.
GetValue
())
u
.
DOut
(
"[%s] handlePutValue %v %v"
,
dht
.
self
.
ID
.
Pretty
(),
dskey
,
pmes
.
GetValue
())
return
nil
,
err
}
...
...
This diff is collapsed.
Click to expand it.
dht/query.go
View file @
aab538ff
...
...
@@ -163,7 +163,7 @@ func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) {
r
.
peersSeen
[
next
.
Key
()]
=
next
r
.
Unlock
()
u
.
P
Out
(
"adding peer to query: %v
\n
"
,
next
.
ID
.
Pretty
())
u
.
D
Out
(
"adding peer to query: %v
\n
"
,
next
.
ID
.
Pretty
())
// do this after unlocking to prevent possible deadlocks.
r
.
peersRemaining
.
Increment
(
1
)
...
...
@@ -187,14 +187,14 @@ func (r *dhtQueryRunner) spawnWorkers() {
if
!
more
{
return
// channel closed.
}
u
.
P
Out
(
"spawning worker for: %v
\n
"
,
p
.
ID
.
Pretty
())
u
.
D
Out
(
"spawning worker for: %v
\n
"
,
p
.
ID
.
Pretty
())
go
r
.
queryPeer
(
p
)
}
}
}
func
(
r
*
dhtQueryRunner
)
queryPeer
(
p
*
peer
.
Peer
)
{
u
.
P
Out
(
"spawned worker for: %v
\n
"
,
p
.
ID
.
Pretty
())
u
.
D
Out
(
"spawned worker for: %v
\n
"
,
p
.
ID
.
Pretty
())
// make sure we rate limit concurrency.
select
{
...
...
@@ -204,33 +204,33 @@ func (r *dhtQueryRunner) queryPeer(p *peer.Peer) {
return
}
u
.
P
Out
(
"running worker for: %v
\n
"
,
p
.
ID
.
Pretty
())
u
.
D
Out
(
"running worker for: %v
\n
"
,
p
.
ID
.
Pretty
())
// finally, run the query against this peer
res
,
err
:=
r
.
query
.
qfunc
(
r
.
ctx
,
p
)
if
err
!=
nil
{
u
.
P
Out
(
"ERROR worker for: %v %v
\n
"
,
p
.
ID
.
Pretty
(),
err
)
u
.
D
Out
(
"ERROR worker for: %v %v
\n
"
,
p
.
ID
.
Pretty
(),
err
)
r
.
Lock
()
r
.
errs
=
append
(
r
.
errs
,
err
)
r
.
Unlock
()
}
else
if
res
.
success
{
u
.
P
Out
(
"SUCCESS worker for: %v
\n
"
,
p
.
ID
.
Pretty
(),
res
)
u
.
D
Out
(
"SUCCESS worker for: %v
\n
"
,
p
.
ID
.
Pretty
(),
res
)
r
.
Lock
()
r
.
result
=
res
r
.
Unlock
()
r
.
cancel
()
// signal to everyone that we're done.
}
else
if
res
.
closerPeers
!=
nil
{
u
.
P
Out
(
"PEERS CLOSER -- worker for: %v
\n
"
,
p
.
ID
.
Pretty
())
u
.
D
Out
(
"PEERS CLOSER -- worker for: %v
\n
"
,
p
.
ID
.
Pretty
())
for
_
,
next
:=
range
res
.
closerPeers
{
r
.
addPeerToQuery
(
next
,
p
)
}
}
// signal we're done proccessing peer p
u
.
P
Out
(
"completing worker for: %v
\n
"
,
p
.
ID
.
Pretty
())
u
.
D
Out
(
"completing worker for: %v
\n
"
,
p
.
ID
.
Pretty
())
r
.
peersRemaining
.
Decrement
(
1
)
r
.
rateLimit
<-
struct
{}{}
}
This diff is collapsed.
Click to expand it.
dht/routing.go
View file @
aab538ff
...
...
@@ -30,6 +30,7 @@ func (dht *IpfsDHT) PutValue(key u.Key, value []byte) error {
}
query
:=
newQuery
(
key
,
func
(
ctx
context
.
Context
,
p
*
peer
.
Peer
)
(
*
dhtQueryResult
,
error
)
{
u
.
DOut
(
"[%s] PutValue qry part %v
\n
"
,
dht
.
self
.
ID
.
Pretty
(),
p
.
ID
.
Pretty
())
err
:=
dht
.
putValueToNetwork
(
ctx
,
p
,
string
(
key
),
value
)
if
err
!=
nil
{
return
nil
,
err
...
...
@@ -38,6 +39,7 @@ func (dht *IpfsDHT) PutValue(key u.Key, value []byte) error {
})
_
,
err
:=
query
.
Run
(
ctx
,
peers
)
u
.
DOut
(
"[%s] PutValue %v %v
\n
"
,
dht
.
self
.
ID
.
Pretty
(),
key
,
value
)
return
err
}
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment