Commit 544e4796 authored by Brian Tiger Chow's avatar Brian Tiger Chow

wip with DHT

@whyrusleeping @jbenet this is a WIP with the DHT.

wip

License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>

Conflicts:
	epictest/addcat_test.go
	exchange/bitswap/testnet/peernet.go
	exchange/bitswap/testutils.go
	routing/mock/centralized_server.go
	routing/mock/centralized_test.go
	routing/mock/interface.go

fix(routing/mock) fill in function definition
parent ea3d2cf6
package dht package dht
import ( import (
"math"
"sync" "sync"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
...@@ -127,6 +128,15 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { ...@@ -127,6 +128,15 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
return nil return nil
} }
// FindProviders searches until the context expires.
func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerInfo, error) {
var providers []peer.PeerInfo
for p := range dht.FindProvidersAsync(ctx, key, math.MaxInt32) {
providers = append(providers, p)
}
return providers, nil
}
// FindProvidersAsync is the same thing as FindProviders, but returns a channel. // FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before // Peers will be returned on the channel as soon as they are found, even before
// the search query completes. // the search query completes.
......
...@@ -5,9 +5,11 @@ import ( ...@@ -5,9 +5,11 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing" routing "github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
"github.com/jbenet/go-ipfs/util/testutil"
) )
var log = u.Logger("mockrouter") var log = u.Logger("mockrouter")
...@@ -15,7 +17,7 @@ var log = u.Logger("mockrouter") ...@@ -15,7 +17,7 @@ var log = u.Logger("mockrouter")
type client struct { type client struct {
datastore ds.Datastore datastore ds.Datastore
server server server server
peer peer.PeerInfo peer testutil.Peer
} }
// FIXME(brian): is this method meant to simulate putting a value into the network? // FIXME(brian): is this method meant to simulate putting a value into the network?
...@@ -70,7 +72,11 @@ func (c *client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-cha ...@@ -70,7 +72,11 @@ func (c *client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-cha
// Provide returns once the message is on the network. Value is not necessarily // Provide returns once the message is on the network. Value is not necessarily
// visible yet. // visible yet.
func (c *client) Provide(_ context.Context, key u.Key) error { func (c *client) Provide(_ context.Context, key u.Key) error {
return c.server.Announce(c.peer, key) info := peer.PeerInfo{
ID: c.peer.ID(),
Addrs: []ma.Multiaddr{c.peer.Address()},
}
return c.server.Announce(info, key)
} }
var _ routing.IpfsRouting = &client{} var _ routing.IpfsRouting = &client{}
...@@ -5,9 +5,11 @@ import ( ...@@ -5,9 +5,11 @@ import (
"sync" "sync"
"time" "time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
"github.com/jbenet/go-ipfs/util/testutil"
) )
// server is the mockrouting.Client's private interface to the routing server // server is the mockrouting.Client's private interface to the routing server
...@@ -71,11 +73,11 @@ func (rs *s) Providers(k u.Key) []peer.PeerInfo { ...@@ -71,11 +73,11 @@ func (rs *s) Providers(k u.Key) []peer.PeerInfo {
return ret return ret
} }
func (rs *s) Client(p peer.PeerInfo) Client { func (rs *s) Client(p testutil.Peer) Client {
return rs.ClientWithDatastore(p, ds.NewMapDatastore()) return rs.ClientWithDatastore(context.Background(), p, ds.NewMapDatastore())
} }
func (rs *s) ClientWithDatastore(p peer.PeerInfo, datastore ds.Datastore) Client { func (rs *s) ClientWithDatastore(_ context.Context, p testutil.Peer, datastore ds.Datastore) Client {
return &client{ return &client{
peer: p, peer: p,
datastore: ds.NewMapDatastore(), datastore: ds.NewMapDatastore(),
......
...@@ -8,11 +8,12 @@ import ( ...@@ -8,11 +8,12 @@ import (
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
delay "github.com/jbenet/go-ipfs/util/delay" delay "github.com/jbenet/go-ipfs/util/delay"
"github.com/jbenet/go-ipfs/util/testutil"
) )
func TestKeyNotFound(t *testing.T) { func TestKeyNotFound(t *testing.T) {
var pi = peer.PeerInfo{ID: peer.ID("the peer id")} var pi = testutil.RandPeerOrFatal(t)
var key = u.Key("mock key") var key = u.Key("mock key")
var ctx = context.Background() var ctx = context.Background()
...@@ -25,7 +26,7 @@ func TestKeyNotFound(t *testing.T) { ...@@ -25,7 +26,7 @@ func TestKeyNotFound(t *testing.T) {
} }
func TestClientFindProviders(t *testing.T) { func TestClientFindProviders(t *testing.T) {
pi := peer.PeerInfo{ID: peer.ID("42")} pi := testutil.RandPeerOrFatal(t)
rs := NewServer() rs := NewServer()
client := rs.Client(pi) client := rs.Client(pi)
...@@ -39,20 +40,6 @@ func TestClientFindProviders(t *testing.T) { ...@@ -39,20 +40,6 @@ func TestClientFindProviders(t *testing.T) {
time.Sleep(time.Millisecond * 300) time.Sleep(time.Millisecond * 300)
max := 100 max := 100
providersFromHashTable, err := rs.Client(pi).FindProviders(context.Background(), k)
if err != nil {
t.Fatal(err)
}
isInHT := false
for _, pi := range providersFromHashTable {
if pi.ID == pi.ID {
isInHT = true
}
}
if !isInHT {
t.Fatal("Despite client providing key, peer wasn't in hash table as a provider")
}
providersFromClient := client.FindProvidersAsync(context.Background(), u.Key("hello"), max) providersFromClient := client.FindProvidersAsync(context.Background(), u.Key("hello"), max)
isInClient := false isInClient := false
for pi := range providersFromClient { for pi := range providersFromClient {
...@@ -70,7 +57,7 @@ func TestClientOverMax(t *testing.T) { ...@@ -70,7 +57,7 @@ func TestClientOverMax(t *testing.T) {
k := u.Key("hello") k := u.Key("hello")
numProvidersForHelloKey := 100 numProvidersForHelloKey := 100
for i := 0; i < numProvidersForHelloKey; i++ { for i := 0; i < numProvidersForHelloKey; i++ {
pi := peer.PeerInfo{ID: peer.ID(i)} pi := testutil.RandPeerOrFatal(t)
err := rs.Client(pi).Provide(context.Background(), k) err := rs.Client(pi).Provide(context.Background(), k)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -78,7 +65,7 @@ func TestClientOverMax(t *testing.T) { ...@@ -78,7 +65,7 @@ func TestClientOverMax(t *testing.T) {
} }
max := 10 max := 10
pi := peer.PeerInfo{ID: peer.ID("TODO")} pi := testutil.RandPeerOrFatal(t)
client := rs.Client(pi) client := rs.Client(pi)
providersFromClient := client.FindProvidersAsync(context.Background(), k, max) providersFromClient := client.FindProvidersAsync(context.Background(), k, max)
...@@ -113,8 +100,11 @@ func TestCanceledContext(t *testing.T) { ...@@ -113,8 +100,11 @@ func TestCanceledContext(t *testing.T) {
default: default:
} }
pi := peer.PeerInfo{ID: peer.ID(i)} pi, err := testutil.RandPeer()
err := rs.Client(pi).Provide(context.Background(), k) if err != nil {
t.Error(err)
}
err = rs.Client(pi).Provide(context.Background(), k)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
...@@ -122,7 +112,7 @@ func TestCanceledContext(t *testing.T) { ...@@ -122,7 +112,7 @@ func TestCanceledContext(t *testing.T) {
} }
}() }()
local := peer.PeerInfo{ID: peer.ID("peer id doesn't matter")} local := testutil.RandPeerOrFatal(t)
client := rs.Client(local) client := rs.Client(local)
t.Log("warning: max is finite so this test is non-deterministic") t.Log("warning: max is finite so this test is non-deterministic")
...@@ -148,7 +138,7 @@ func TestCanceledContext(t *testing.T) { ...@@ -148,7 +138,7 @@ func TestCanceledContext(t *testing.T) {
func TestValidAfter(t *testing.T) { func TestValidAfter(t *testing.T) {
var pi = peer.PeerInfo{ID: peer.ID("the peer id")} pi := testutil.RandPeerOrFatal(t)
var key = u.Key("mock key") var key = u.Key("mock key")
var ctx = context.Background() var ctx = context.Background()
conf := DelayConfig{ conf := DelayConfig{
......
...@@ -11,18 +11,18 @@ import ( ...@@ -11,18 +11,18 @@ import (
routing "github.com/jbenet/go-ipfs/routing" routing "github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
delay "github.com/jbenet/go-ipfs/util/delay" delay "github.com/jbenet/go-ipfs/util/delay"
"github.com/jbenet/go-ipfs/util/testutil"
) )
// Server provides mockrouting Clients // Server provides mockrouting Clients
type Server interface { type Server interface {
Client(p peer.PeerInfo) Client Client(p testutil.Peer) Client
ClientWithDatastore(peer.PeerInfo, ds.Datastore) Client ClientWithDatastore(context.Context, testutil.Peer, ds.Datastore) Client
} }
// Client implements IpfsRouting // Client implements IpfsRouting
type Client interface { type Client interface {
FindProviders(context.Context, u.Key) ([]peer.PeerInfo, error) FindProviders(context.Context, u.Key) ([]peer.PeerInfo, error)
routing.IpfsRouting routing.IpfsRouting
} }
......
package mockrouting
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
mocknet "github.com/jbenet/go-ipfs/net/mock"
dht "github.com/jbenet/go-ipfs/routing/dht"
"github.com/jbenet/go-ipfs/util/testutil"
)
type mocknetserver struct {
mn mocknet.Mocknet
}
func NewDHTNetwork(mn mocknet.Mocknet) Server {
return &mocknetserver{
mn: mn,
}
}
func (rs *mocknetserver) Client(p testutil.Peer) Client {
return rs.ClientWithDatastore(context.TODO(), p, ds.NewMapDatastore())
}
func (rs *mocknetserver) ClientWithDatastore(ctx context.Context, p testutil.Peer, ds ds.Datastore) Client {
// FIXME AddPeer doesn't appear to be idempotent
net, err := rs.mn.AddPeer(p.PrivateKey(), p.Address())
if err != nil {
panic("FIXME")
// return nil, debugerror.Wrap(err)
}
return dht.NewDHT(ctx, p.ID(), net, sync.MutexWrap(ds))
}
var _ Server = &mocknetserver{}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment