Commit 61f13ea7 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by Juan Batiz-Benet

begin planning of identification process

parent 8203d2c0
// The identify package handles how peers identify with eachother upon
// connection to the network
package identify
import (
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
)
// Perform initial communication with this peer to share node ID's and
// initiate communication
func Handshake(self *peer.Peer, conn *swarm.Conn) error {
// temporary:
// put your own id in a 16byte buffer and send that over to
// the peer as your ID, then wait for them to send their ID.
// Once that trade is finished, the handshake is complete and
// both sides should 'trust' each other
id := make([]byte, 16)
copy(id, self.ID)
conn.Outgoing.MsgChan <- id
resp := <-conn.Incoming.MsgChan
conn.Peer.ID = peer.ID(resp)
return nil
}
message Identify {
required bytes id = 1;
}
......@@ -2,10 +2,14 @@ package dht
import (
"sync"
"time"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
identify "github.com/jbenet/go-ipfs/identify"
ma "github.com/jbenet/go-multiaddr"
ds "github.com/jbenet/datastore.go"
......@@ -35,15 +39,44 @@ type IpfsDHT struct {
shutdown chan struct{}
}
func NewDHT(p *peer.Peer) *IpfsDHT {
// Create a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
dht := new(IpfsDHT)
dht.self = p
dht.network = swarm.NewSwarm(p)
//TODO: should Listen return an error?
dht.network.Listen()
dht.datastore = ds.NewMapDatastore()
dht.self = p
dht.listeners = make(map[uint64]chan *swarm.Message)
dht.shutdown = make(chan struct{})
return dht
return dht, nil
}
// Connect to a new peer at the given address
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) error {
peer := new(peer.Peer)
peer.AddAddress(addr)
conn,err := swarm.Dial("tcp", peer)
if err != nil {
return err
}
err = identify.Handshake(dht.self, conn)
if err != nil {
return err
}
dht.network.StartConn(conn.Peer.Key(), conn)
// TODO: Add this peer to our routing table
return nil
}
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
func (dht *IpfsDHT) handleMessages() {
......@@ -134,11 +167,9 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
resp := new(DHTMessage)
resp.Id = pmes.Id
resp.Response = &isResponse
resp.Type = pmes.Type
mes := new(swarm.Message)
mes.Peer = p
mes.Data = []byte(resp.String())
dht.network.Chan.Outgoing <- mes
dht.network.Chan.Outgoing <-swarm.NewMessage(p, []byte(resp.String()))
}
......@@ -162,9 +193,36 @@ func (dht *IpfsDHT) Unlisten(mesid uint64) {
close(ch)
}
// Stop all communications from this node and shut down
func (dht *IpfsDHT) Halt() {
dht.shutdown <- struct{}{}
dht.network.Close()
}
// Ping a node, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) {
// Thoughts: maybe this should accept an ID and do a peer lookup?
id := GenerateMessageID()
mes_type := DHTMessage_PING
pmes := new(DHTMessage)
pmes.Id = &id
pmes.Type = &mes_type
mes := new(swarm.Message)
mes.Peer = p
mes.Data = []byte(pmes.String())
before := time.Now()
response_chan := dht.ListenFor(id)
dht.network.Chan.Outgoing <- mes
tout := time.After(timeout)
select {
case <-response_chan:
roundtrip := time.Since(before)
u.DOut("Ping took %s.", roundtrip.String())
case <-tout:
// Timed out, think about removing node from network
u.DOut("Ping node timed out.")
}
}
......@@ -2,11 +2,12 @@ package swarm
import (
"fmt"
"net"
"sync"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
"net"
"sync"
)
// Message represents a packet of information sent to or received from a
......@@ -19,6 +20,14 @@ type Message struct {
Data []byte
}
// Cleaner looking helper function to make a new message struct
func NewMessage(p *peer.Peer, data []byte) *Message {
return &Message{
Peer: p,
Data: data,
}
}
// Chan is a swam channel, which provides duplex communication and errors.
type Chan struct {
Outgoing chan *Message
......@@ -87,7 +96,8 @@ func (s *Swarm) connListen(maddr *ma.Multiaddr) error {
for {
nconn, err := list.Accept()
if err != nil {
u.PErr("Failed to accept connection: %s - %s", netstr, addr)
u.PErr("Failed to accept connection: %s - %s [%s]", netstr,
addr, err)
return
}
go s.handleNewConn(nconn)
......@@ -99,7 +109,27 @@ func (s *Swarm) connListen(maddr *ma.Multiaddr) error {
// Handle getting ID from this peer and adding it into the map
func (s *Swarm) handleNewConn(nconn net.Conn) {
panic("Not yet implemented!")
p := MakePeerFromConn(nconn)
var addr *ma.Multiaddr
//naddr := nconn.RemoteAddr()
//addr := ma.FromDialArgs(naddr.Network(), naddr.String())
conn := &Conn{
Peer: p,
Addr: addr,
Conn: nconn,
}
newConnChans(conn)
go s.fanIn(conn)
}
// Negotiate with peer for its ID and create a peer object
// TODO: this might belong in the peer package
func MakePeerFromConn(conn net.Conn) *peer.Peer {
panic("Not yet implemented.")
}
// Close closes a swarm.
......@@ -140,6 +170,11 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) {
return nil, err
}
s.StartConn(k, conn)
return conn, nil
}
func (s *Swarm) StartConn(k u.Key, conn *Conn) {
// add to conns
s.connsLock.Lock()
s.conns[k] = conn
......@@ -147,7 +182,6 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) {
// kick off reader goroutine
go s.fanIn(conn)
return conn, nil
}
// Handles the unwrapping + sending of messages to the right connection.
......@@ -165,7 +199,8 @@ func (s *Swarm) fanOut() {
conn, found := s.conns[msg.Peer.Key()]
s.connsLock.RUnlock()
if !found {
e := fmt.Errorf("Sent msg to peer without open conn: %v", msg.Peer)
e := fmt.Errorf("Sent msg to peer without open conn: %v",
msg.Peer)
s.Chan.Errors <- e
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment