Commit d574d518 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

add in message type routing to the swarm object. tired, needs cleanup.

parent f03d3626
......@@ -100,10 +100,11 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
func (dht *IpfsDHT) handleMessages() {
u.DOut("Begin message handling routine\n")
ch := dht.network.GetChan()
errs := dht.network.GetErrChan()
dhtmes := dht.network.GetChannel(swarm.PBWrapper_DHT_MESSAGE)
for {
select {
case mes, ok := <-ch.Incoming:
case mes, ok := <-dhtmes:
if !ok {
u.DOut("handleMessages closing, bad recv on incoming\n")
return
......@@ -147,7 +148,7 @@ func (dht *IpfsDHT) handleMessages() {
u.PErr("Recieved invalid message type")
}
case err := <-ch.Errors:
case err := <-errs:
u.PErr("dht err: %s\n", err)
case <-dht.shutdown:
return
......
......@@ -132,8 +132,8 @@ func TestValueGetSet(t *testing.T) {
dhtA.Start()
dhtB.Start()
errsa := dhtA.network.GetChan().Errors
errsb := dhtB.network.GetChan().Errors
errsa := dhtA.network.GetErrChan()
errsb := dhtB.network.GetErrChan()
go func() {
select {
case err := <-errsa:
......
......@@ -66,8 +66,12 @@ func (f *fauxNet) Send(mes *swarm.Message) {
f.Chan.Outgoing <- mes
}
func (f *fauxNet) GetChan() *swarm.Chan {
return f.Chan
func (f *fauxNet) GetErrChan() chan error {
return f.Chan.Errors
}
func (f *fauxNet) GetChannel(t swarm.PBWrapper_MessageType) chan *swarm.Message {
return f.Chan.Incoming
}
func (f *fauxNet) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
......@@ -167,7 +171,6 @@ func _randPeer() *peer.Peer {
}
func TestNotFound(t *testing.T) {
u.Debug = true
fn := newFauxNet()
fn.Listen()
......@@ -225,3 +228,64 @@ func TestNotFound(t *testing.T) {
}
t.Fatal("Expected to recieve an error.")
}
// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
u.Debug = false
fn := newFauxNet()
fn.Listen()
local := new(peer.Peer)
local.ID = peer.ID("test_peer")
d := NewDHT(local, fn)
d.Start()
var ps []*peer.Peer
for i := 0; i < 5; i++ {
ps = append(ps, _randPeer())
d.Update(ps[i])
}
other := _randPeer()
// Reply with random peers to every message
fn.AddHandler(func(mes *swarm.Message) *swarm.Message {
t.Log("Handling message...")
pmes := new(PBDHTMessage)
err := proto.Unmarshal(mes.Data, pmes)
if err != nil {
t.Fatal(err)
}
switch pmes.GetType() {
case PBDHTMessage_GET_VALUE:
resp := Message{
Type: pmes.GetType(),
ID: pmes.GetId(),
Response: true,
Success: false,
Peers: []*peer.Peer{other},
}
return swarm.NewMessage(mes.Peer, resp.ToProtobuf())
default:
panic("Shouldnt recieve this.")
}
})
_, err := d.GetValue(u.Key("hello"), time.Second*30)
if err != nil {
switch err {
case u.ErrNotFound:
//Success!
return
case u.ErrTimeout:
t.Fatal("Should not have gotten timeout!")
default:
t.Fatalf("Got unexpected error: %s", err)
}
}
t.Fatal("Expected to recieve an error.")
}
......@@ -3,24 +3,24 @@ package dht
import (
"time"
u "github.com/jbenet/go-ipfs/util"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
type ProviderManager struct {
providers map[u.Key][]*providerInfo
newprovs chan *addProv
getprovs chan *getProv
halt chan struct{}
newprovs chan *addProv
getprovs chan *getProv
halt chan struct{}
}
type addProv struct {
k u.Key
k u.Key
val *peer.Peer
}
type getProv struct {
k u.Key
k u.Key
resp chan []*peer.Peer
}
......@@ -55,7 +55,7 @@ func (pm *ProviderManager) run() {
for k, provs := range pm.providers {
var filtered []*providerInfo
for _, p := range provs {
if time.Now().Sub(p.Creation) < time.Hour * 24 {
if time.Now().Sub(p.Creation) < time.Hour*24 {
filtered = append(filtered, p)
}
}
......@@ -69,7 +69,7 @@ func (pm *ProviderManager) run() {
func (pm *ProviderManager) AddProvider(k u.Key, val *peer.Peer) {
pm.newprovs <- &addProv{
k: k,
k: k,
val: val,
}
}
......
......@@ -164,7 +164,8 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
case p := <-npeerChan:
count++
if count >= KValue {
break
errChan <- u.ErrNotFound
return
}
c.Increment()
......@@ -172,40 +173,38 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
default:
if c.Size() == 0 {
errChan <- u.ErrNotFound
return
}
}
}
}()
process := func() {
for {
select {
case p, ok := <-procPeer:
if !ok || p == nil {
c.Decrement()
return
}
val, peers, err := dht.getValueOrPeers(p, key, timeout/4, routeLevel)
if err != nil {
u.DErr("%v\n", err.Error())
c.Decrement()
continue
}
if val != nil {
valChan <- val
c.Decrement()
return
}
for p := range procPeer {
if p == nil {
c.Decrement()
return
}
val, peers, err := dht.getValueOrPeers(p, key, timeout/4, routeLevel)
if err != nil {
u.DErr("%v\n", err.Error())
c.Decrement()
continue
}
if val != nil {
valChan <- val
c.Decrement()
return
}
for _, np := range peers {
// TODO: filter out peers that arent closer
if !pset.Contains(np) && pset.Size() < KValue {
pset.Add(np) //This is racey... make a single function to do operation
npeerChan <- np
}
for _, np := range peers {
// TODO: filter out peers that arent closer
if !pset.Contains(np) && pset.Size() < KValue {
pset.Add(np) //This is racey... make a single function to do operation
npeerChan <- np
}
c.Decrement()
}
c.Decrement()
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment