Commit 92fb51d9 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by Juan Batiz-Benet

finish basic communcations between nodes and add a test of the ping operation

parent 8d98d4b4
......@@ -41,20 +41,22 @@ type IpfsDHT struct {
// Create a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
dht := new(IpfsDHT)
dht.network = swarm.NewSwarm(p)
//TODO: should Listen return an error?
dht.network.Listen()
network := swarm.NewSwarm(p)
err := network.Listen()
if err != nil {
return nil,err
}
dht := new(IpfsDHT)
dht.network = network
dht.datastore = ds.NewMapDatastore()
dht.self = p
dht.listeners = make(map[uint64]chan *swarm.Message)
dht.shutdown = make(chan struct{})
return dht, nil
}
// Start up background goroutines needed by the DHT
func (dht *IpfsDHT) Start() {
go dht.handleMessages()
}
......@@ -111,6 +113,7 @@ func (dht *IpfsDHT) handleMessages() {
}
//
u.DOut("Got message type: %d", pmes.GetType())
switch pmes.GetType() {
case DHTMessage_GET_VALUE:
dht.handleGetValue(mes.Peer, pmes)
......@@ -121,7 +124,7 @@ func (dht *IpfsDHT) handleMessages() {
case DHTMessage_ADD_PROVIDER:
case DHTMessage_GET_PROVIDERS:
case DHTMessage_PING:
dht.handleFindNode(mes.Peer, pmes)
dht.handlePing(mes.Peer, pmes)
}
case err := <-dht.network.Chan.Errors:
......@@ -136,18 +139,15 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
dskey := ds.NewKey(pmes.GetKey())
i_val, err := dht.datastore.Get(dskey)
if err == nil {
isResponse := true
resp := new(DHTMessage)
resp.Response = &isResponse
resp.Id = pmes.Id
resp.Key = pmes.Key
val := i_val.([]byte)
resp.Value = val
mes := new(swarm.Message)
mes.Peer = p
mes.Data = []byte(resp.String())
resp := &pDHTMessage{
Response: true,
Id: *pmes.Id,
Key: *pmes.Key,
Value: i_val.([]byte),
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <- mes
} else if err == ds.ErrNotFound {
// Find closest node(s) to desired key and reply with that info
// TODO: this will need some other metadata in the protobuf message
......@@ -167,13 +167,13 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
}
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
isResponse := true
resp := new(DHTMessage)
resp.Id = pmes.Id
resp.Response = &isResponse
resp.Type = pmes.Type
resp := &pDHTMessage{
Type: pmes.GetType(),
Response: true,
Id: pmes.GetId(),
}
dht.network.Chan.Outgoing <-swarm.NewMessage(p, []byte(resp.String()))
dht.network.Chan.Outgoing <-swarm.NewMessage(p, resp.ToProtobuf())
}
func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
......@@ -199,6 +199,7 @@ func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
return lchan
}
// Unregister the given message id from the listener map
func (dht *IpfsDHT) Unlisten(mesid uint64) {
dht.listenLock.Lock()
ch, ok := dht.listeners[mesid]
......@@ -216,31 +217,26 @@ func (dht *IpfsDHT) Halt() {
}
// Ping a node, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) {
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
u.DOut("Enter Ping.")
id := GenerateMessageID()
mes_type := DHTMessage_PING
pmes := new(DHTMessage)
pmes.Id = &id
pmes.Type = &mes_type
mes := new(swarm.Message)
mes.Peer = p
mes.Data = []byte(pmes.String())
pmes := pDHTMessage{Id: GenerateMessageID(), Type: DHTMessage_PING}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
before := time.Now()
response_chan := dht.ListenFor(id)
response_chan := dht.ListenFor(pmes.Id)
dht.network.Chan.Outgoing <- mes
tout := time.After(timeout)
select {
case <-response_chan:
roundtrip := time.Since(before)
u.DOut("Ping took %s.", roundtrip.String())
u.POut("Ping took %s.", roundtrip.String())
return nil
case <-tout:
// Timed out, think about removing node from network
u.DOut("Ping node timed out.")
return u.ErrTimeout
}
}
package dht
import (
"testing"
peer "github.com/jbenet/go-ipfs/peer"
ma "github.com/jbenet/go-multiaddr"
u "github.com/jbenet/go-ipfs/util"
"time"
)
func TestPing(t *testing.T) {
u.Debug = false
addr_a,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234")
if err != nil {
t.Fatal(err)
}
addr_b,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678")
if err != nil {
t.Fatal(err)
}
peer_a := new(peer.Peer)
peer_a.AddAddress(addr_a)
peer_a.ID = peer.ID([]byte("peer_a"))
peer_b := new(peer.Peer)
peer_b.AddAddress(addr_b)
peer_b.ID = peer.ID([]byte("peer_b"))
dht_a,err := NewDHT(peer_a)
if err != nil {
t.Fatal(err)
}
dht_b,err := NewDHT(peer_b)
if err != nil {
t.Fatal(err)
}
dht_a.Start()
dht_b.Start()
err = dht_a.Connect(addr_b)
if err != nil {
t.Fatal(err)
}
//Test that we can ping the node
err = dht_a.Ping(peer_b, time.Second * 2)
if err != nil {
t.Fatal(err)
}
}
package dht
// A helper struct to make working with protbuf types easier
type pDHTMessage struct {
Type DHTMessage_MessageType
Key string
Value []byte
Response bool
Id uint64
}
func (m *pDHTMessage) ToProtobuf() *DHTMessage {
pmes := new(DHTMessage)
if m.Value != nil {
pmes.Value = m.Value
}
pmes.Type = &m.Type
pmes.Key = &m.Key
pmes.Response = &m.Response
pmes.Id = &m.Id
return pmes
}
......@@ -9,6 +9,7 @@ import (
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
ident "github.com/jbenet/go-ipfs/identify"
proto "code.google.com/p/goprotobuf/proto"
)
// Message represents a packet of information sent to or received from a
......@@ -22,10 +23,14 @@ type Message struct {
}
// Cleaner looking helper function to make a new message struct
func NewMessage(p *peer.Peer, data []byte) *Message {
func NewMessage(p *peer.Peer, data proto.Message) *Message {
bytes,err := proto.Marshal(data)
if err != nil {
panic(err)
}
return &Message{
Peer: p,
Data: data,
Data: bytes,
}
}
......@@ -47,6 +52,25 @@ func NewChan(bufsize int) *Chan {
}
}
// Contains a set of errors mapping to each of the swarms addresses
// that were listened on
type SwarmListenErr struct {
Errors []error
}
func (se *SwarmListenErr) Error() string {
if se == nil {
return "<nil error>"
}
var out string
for i,v := range se.Errors {
if v != nil {
out += fmt.Sprintf("%d: %s\n", i, v)
}
}
return out
}
// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
......@@ -71,13 +95,23 @@ func NewSwarm(local *peer.Peer) *Swarm {
}
// Open listeners for each network the swarm should listen on
func (s *Swarm) Listen() {
for _, addr := range s.local.Addresses {
func (s *Swarm) Listen() error {
var ret_err *SwarmListenErr
for i, addr := range s.local.Addresses {
err := s.connListen(addr)
if err != nil {
if ret_err != nil {
ret_err = new(SwarmListenErr)
ret_err.Errors = make([]error, len(s.local.Addresses))
}
ret_err.Errors[i] = err
u.PErr("Failed to listen on: %s [%s]", addr, err)
}
}
if ret_err == nil {
return nil
}
return ret_err
}
// Listen for new connections on the given multiaddr
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment