Commit 07e7f9a2 authored by Brian Tiger Chow's avatar Brian Tiger Chow

refactor(mockrouting) misc

License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>
parent 3aaf6463
package mockrouting
import (
"errors"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util"
)
var log = u.Logger("mockrouter")
type client struct {
datastore ds.Datastore
server server
peer peer.Peer
}
// FIXME(brian): is this method meant to simulate putting a value into the network?
func (c *client) PutValue(ctx context.Context, key u.Key, val []byte) error {
log.Debugf("PutValue: %s", key)
return c.datastore.Put(key.DsKey(), val)
}
// FIXME(brian): is this method meant to simulate getting a value from the network?
func (c *client) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
log.Debugf("GetValue: %s", key)
v, err := c.datastore.Get(key.DsKey())
if err != nil {
return nil, err
}
data, ok := v.([]byte)
if !ok {
return nil, errors.New("could not cast value from datastore")
}
return data, nil
}
func (c *client) FindProviders(ctx context.Context, key u.Key) ([]peer.Peer, error) {
return c.server.Providers(key), nil
}
func (c *client) FindPeer(ctx context.Context, pid peer.ID) (peer.Peer, error) {
log.Debugf("FindPeer: %s", pid)
return nil, nil
}
func (c *client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.Peer {
out := make(chan peer.Peer)
go func() {
defer close(out)
for i, p := range c.server.Providers(k) {
if max <= i {
return
}
select {
case out <- p:
case <-ctx.Done():
return
}
}
}()
return out
}
func (c *client) Provide(_ context.Context, key u.Key) error {
return c.server.Announce(c.peer, key)
}
var _ routing.IpfsRouting = &client{}
// Package mock provides a virtual routing server. To use it, create a virtual
// routing server and use the Client() method to get a routing client
// (IpfsRouting). The server quacks like a DHT but is really a local in-memory
// hash table.
package mockrouting
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util"
delay "github.com/jbenet/go-ipfs/util/delay"
)
// Server provides mockrouting Clients
type Server interface {
Client(p peer.Peer) Client
ClientWithDatastore(peer.Peer, ds.Datastore) Client
}
// Client implements IpfsRouting
type Client interface {
FindProviders(context.Context, u.Key) ([]peer.Peer, error)
routing.IpfsRouting
}
// NewServer returns a mockrouting Server
func NewServer() Server {
return NewServerWithDelay(delay.Fixed(0))
}
// NewServerWithDelay returns a mockrouting Server with a delay!
func NewServerWithDelay(d delay.D) Server {
return &s{
providers: make(map[u.Key]peer.Map),
delay: d,
}
}
package mock
package mockrouting
import (
"bytes"
......@@ -12,37 +12,21 @@ import (
func TestKeyNotFound(t *testing.T) {
vrs := VirtualRoutingServer()
empty := vrs.Providers(u.Key("not there"))
if len(empty) != 0 {
t.Fatal("should be empty")
}
}
var peer = testutil.NewPeerWithID(peer.ID([]byte("the peer id")))
var key = u.Key("mock key")
var ctx = context.Background()
func TestSetAndGet(t *testing.T) {
pid := peer.ID([]byte("the peer id"))
p := testutil.NewPeerWithID(pid)
k := u.Key("42")
rs := VirtualRoutingServer()
err := rs.Announce(p, k)
if err != nil {
t.Fatal(err)
}
providers := rs.Providers(k)
if len(providers) != 1 {
t.Fatal("should be one")
rs := NewServer()
providers := rs.Client(peer).FindProvidersAsync(ctx, key, 10)
_, ok := <-providers
if ok {
t.Fatal("should be closed")
}
for _, elem := range providers {
if bytes.Equal(elem.ID(), pid) {
return
}
}
t.Fatal("ID should have matched")
}
func TestClientFindProviders(t *testing.T) {
peer := testutil.NewPeerWithIDString("42")
rs := VirtualRoutingServer()
rs := NewServer()
client := rs.Client(peer)
k := u.Key("hello")
......@@ -52,7 +36,10 @@ func TestClientFindProviders(t *testing.T) {
}
max := 100
providersFromHashTable := rs.Providers(k)
providersFromHashTable, err := rs.Client(peer).FindProviders(context.Background(), k)
if err != nil {
t.Fatal(err)
}
isInHT := false
for _, p := range providersFromHashTable {
......@@ -76,21 +63,16 @@ func TestClientFindProviders(t *testing.T) {
}
func TestClientOverMax(t *testing.T) {
rs := VirtualRoutingServer()
rs := NewServer()
k := u.Key("hello")
numProvidersForHelloKey := 100
for i := 0; i < numProvidersForHelloKey; i++ {
peer := testutil.NewPeerWithIDString(string(i))
err := rs.Announce(peer, k)
err := rs.Client(peer).Provide(context.Background(), k)
if err != nil {
t.Fatal(err)
}
}
providersFromHashTable := rs.Providers(k)
if len(providersFromHashTable) != numProvidersForHelloKey {
t.Log(1 == len(providersFromHashTable))
t.Fatal("not all providers were returned")
}
max := 10
peer := testutil.NewPeerWithIDString("TODO")
......@@ -108,7 +90,7 @@ func TestClientOverMax(t *testing.T) {
// TODO does dht ensure won't receive self as a provider? probably not.
func TestCanceledContext(t *testing.T) {
rs := VirtualRoutingServer()
rs := NewServer()
k := u.Key("hello")
t.Log("async'ly announce infinite stream of providers for key")
......@@ -116,7 +98,7 @@ func TestCanceledContext(t *testing.T) {
go func() { // infinite stream
for {
peer := testutil.NewPeerWithIDString(string(i))
err := rs.Announce(peer, k)
err := rs.Client(peer).Provide(context.Background(), k)
if err != nil {
t.Fatal(err)
}
......
package mock
import (
"errors"
"math/rand"
"sync"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util"
)
var log = u.Logger("mockrouter")
var _ routing.IpfsRouting = &MockRouter{}
type MockRouter struct {
datastore ds.Datastore
hashTable RoutingServer
peer peer.Peer
}
func NewMockRouter(local peer.Peer, dstore ds.Datastore) routing.IpfsRouting {
return &MockRouter{
datastore: dstore,
peer: local,
hashTable: VirtualRoutingServer(),
}
}
func (mr *MockRouter) PutValue(ctx context.Context, key u.Key, val []byte) error {
log.Debugf("PutValue: %s", key)
return mr.datastore.Put(key.DsKey(), val)
}
func (mr *MockRouter) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
log.Debugf("GetValue: %s", key)
v, err := mr.datastore.Get(key.DsKey())
if err != nil {
return nil, err
}
data, ok := v.([]byte)
if !ok {
return nil, errors.New("could not cast value from datastore")
}
return data, nil
}
func (mr *MockRouter) FindProviders(ctx context.Context, key u.Key) ([]peer.Peer, error) {
return nil, nil
}
func (mr *MockRouter) FindPeer(ctx context.Context, pid peer.ID) (peer.Peer, error) {
log.Debugf("FindPeer: %s", pid)
return nil, nil
}
func (mr *MockRouter) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.Peer {
out := make(chan peer.Peer)
go func() {
defer close(out)
for i, p := range mr.hashTable.Providers(k) {
if max <= i {
return
}
select {
case out <- p:
case <-ctx.Done():
return
}
}
}()
return out
}
func (mr *MockRouter) Provide(_ context.Context, key u.Key) error {
return mr.hashTable.Announce(mr.peer, key)
}
type RoutingServer interface {
Announce(peer.Peer, u.Key) error
Providers(u.Key) []peer.Peer
Client(p peer.Peer) routing.IpfsRouting
}
func VirtualRoutingServer() RoutingServer {
return &hashTable{
providers: make(map[u.Key]peer.Map),
}
}
type hashTable struct {
lock sync.RWMutex
providers map[u.Key]peer.Map
}
func (rs *hashTable) Announce(p peer.Peer, k u.Key) error {
rs.lock.Lock()
defer rs.lock.Unlock()
_, ok := rs.providers[k]
if !ok {
rs.providers[k] = make(peer.Map)
}
rs.providers[k][p.Key()] = p
return nil
}
func (rs *hashTable) Providers(k u.Key) []peer.Peer {
rs.lock.RLock()
defer rs.lock.RUnlock()
var ret []peer.Peer
peerset, ok := rs.providers[k]
if !ok {
return ret
}
for _, peer := range peerset {
ret = append(ret, peer)
}
for i := range ret {
j := rand.Intn(i + 1)
ret[i], ret[j] = ret[j], ret[i]
}
return ret
}
func (rs *hashTable) Client(p peer.Peer) routing.IpfsRouting {
return &MockRouter{
peer: p,
hashTable: rs,
}
}
package mockrouting
import (
"math/rand"
"sync"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
delay "github.com/jbenet/go-ipfs/util/delay"
)
// server is the mockrouting.Client's private interface to the routing server
type server interface {
Announce(peer.Peer, u.Key) error
Providers(u.Key) []peer.Peer
Server
}
// s is an implementation of the private server interface
type s struct {
delay delay.D
lock sync.RWMutex
providers map[u.Key]peer.Map
}
func (rs *s) Announce(p peer.Peer, k u.Key) error {
rs.delay.Wait() // before locking
rs.lock.Lock()
defer rs.lock.Unlock()
_, ok := rs.providers[k]
if !ok {
rs.providers[k] = make(peer.Map)
}
rs.providers[k][p.Key()] = p
return nil
}
func (rs *s) Providers(k u.Key) []peer.Peer {
rs.delay.Wait() // before locking
rs.lock.RLock()
defer rs.lock.RUnlock()
var ret []peer.Peer
peerset, ok := rs.providers[k]
if !ok {
return ret
}
for _, peer := range peerset {
ret = append(ret, peer)
}
for i := range ret {
j := rand.Intn(i + 1)
ret[i], ret[j] = ret[j], ret[i]
}
return ret
}
func (rs *s) Client(p peer.Peer) Client {
return rs.ClientWithDatastore(p, ds.NewMapDatastore())
}
func (rs *s) ClientWithDatastore(p peer.Peer, datastore ds.Datastore) Client {
return &client{
peer: p,
datastore: ds.NewMapDatastore(),
server: rs,
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment