Commit 748d08d0 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

finish implementation of Put and Get for DHT

parent 25b81459
...@@ -21,7 +21,7 @@ import ( ...@@ -21,7 +21,7 @@ import (
// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications. // IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module. // It is used to implement the base IpfsRouting module.
type IpfsDHT struct { type IpfsDHT struct {
routes RoutingTable routes *RoutingTable
network *swarm.Swarm network *swarm.Swarm
...@@ -53,6 +53,7 @@ func NewDHT(p *peer.Peer) (*IpfsDHT, error) { ...@@ -53,6 +53,7 @@ func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
dht.self = p dht.self = p
dht.listeners = make(map[uint64]chan *swarm.Message) dht.listeners = make(map[uint64]chan *swarm.Message)
dht.shutdown = make(chan struct{}) dht.shutdown = make(chan struct{})
dht.routes = NewRoutingTable(20, convertPeerID(p.ID))
return dht, nil return dht, nil
} }
...@@ -78,14 +79,14 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) error { ...@@ -78,14 +79,14 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) error {
dht.network.StartConn(conn) dht.network.StartConn(conn)
// TODO: Add this peer to our routing table dht.routes.Update(peer)
return nil return nil
} }
// Read in all messages from swarm and handle them appropriately // Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch // NOTE: this function is just a quick sketch
func (dht *IpfsDHT) handleMessages() { func (dht *IpfsDHT) handleMessages() {
u.DOut("Being message handling routine") u.DOut("Begin message handling routine")
for { for {
select { select {
case mes := <-dht.network.Chan.Incoming: case mes := <-dht.network.Chan.Incoming:
...@@ -98,6 +99,9 @@ func (dht *IpfsDHT) handleMessages() { ...@@ -98,6 +99,9 @@ func (dht *IpfsDHT) handleMessages() {
continue continue
} }
// Update peers latest visit in routing table
dht.routes.Update(mes.Peer)
// Note: not sure if this is the correct place for this // Note: not sure if this is the correct place for this
if pmes.GetResponse() { if pmes.GetResponse() {
dht.listenLock.RLock() dht.listenLock.RLock()
......
...@@ -6,9 +6,13 @@ import ( ...@@ -6,9 +6,13 @@ import (
ma "github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-multiaddr"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
"fmt"
"time" "time"
) )
var _ = fmt.Println
func TestPing(t *testing.T) { func TestPing(t *testing.T) {
u.Debug = false u.Debug = false
addr_a,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234") addr_a,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234")
...@@ -38,7 +42,6 @@ func TestPing(t *testing.T) { ...@@ -38,7 +42,6 @@ func TestPing(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
dht_a.Start() dht_a.Start()
dht_b.Start() dht_b.Start()
...@@ -52,4 +55,59 @@ func TestPing(t *testing.T) { ...@@ -52,4 +55,59 @@ func TestPing(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
dht_a.Halt()
dht_b.Halt()
}
func TestValueGetSet(t *testing.T) {
u.Debug = false
addr_a,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235")
if err != nil {
t.Fatal(err)
}
addr_b,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5679")
if err != nil {
t.Fatal(err)
}
peer_a := new(peer.Peer)
peer_a.AddAddress(addr_a)
peer_a.ID = peer.ID([]byte("peer_a"))
peer_b := new(peer.Peer)
peer_b.AddAddress(addr_b)
peer_b.ID = peer.ID([]byte("peer_b"))
dht_a,err := NewDHT(peer_a)
if err != nil {
t.Fatal(err)
}
dht_b,err := NewDHT(peer_b)
if err != nil {
t.Fatal(err)
}
dht_a.Start()
dht_b.Start()
err = dht_a.Connect(addr_b)
if err != nil {
t.Fatal(err)
}
err = dht_a.PutValue("hello", []byte("world"))
if err != nil {
t.Fatal(err)
}
val, err := dht_a.GetValue("hello", time.Second * 2)
if err != nil {
t.Fatal(err)
}
if string(val) != "world" {
t.Fatalf("Expected 'world' got %s", string(val))
}
} }
...@@ -4,6 +4,8 @@ import ( ...@@ -4,6 +4,8 @@ import (
"math/rand" "math/rand"
"time" "time"
proto "code.google.com/p/goprotobuf/proto"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm" swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
...@@ -22,21 +24,20 @@ func GenerateMessageID() uint64 { ...@@ -22,21 +24,20 @@ func GenerateMessageID() uint64 {
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error { func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
var p *peer.Peer var p *peer.Peer
p = s.routes.NearestPeer(convertKey(key)) p = s.routes.NearestPeer(convertKey(key))
if p == nil {
u.POut("nbuckets: %d", len(s.routes.Buckets))
u.POut("%d", s.routes.Buckets[0].Len())
panic("Table returned nil peer!")
}
pmes_type := DHTMessage_PUT_VALUE pmes := pDHTMessage{
str_key := string(key) Type: DHTMessage_PUT_VALUE,
mes_id := GenerateMessageID() Key: string(key),
Value: value,
pmes := new(DHTMessage) Id: GenerateMessageID(),
pmes.Type = &pmes_type }
pmes.Key = &str_key
pmes.Value = value
pmes.Id = &mes_id
mes := new(swarm.Message)
mes.Data = []byte(pmes.String())
mes.Peer = p
mes := swarm.NewMessage(p, pmes.ToProtobuf())
s.network.Chan.Outgoing <- mes s.network.Chan.Outgoing <- mes
return nil return nil
} }
...@@ -45,21 +46,19 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error { ...@@ -45,21 +46,19 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
var p *peer.Peer var p *peer.Peer
p = s.routes.NearestPeer(convertKey(key)) p = s.routes.NearestPeer(convertKey(key))
if p == nil {
panic("Table returned nil peer!")
}
str_key := string(key) pmes := pDHTMessage{
mes_type := DHTMessage_GET_VALUE Type: DHTMessage_GET_VALUE,
mes_id := GenerateMessageID() Key: string(key),
// protobuf structure Id: GenerateMessageID(),
pmes := new(DHTMessage) }
pmes.Type = &mes_type response_chan := s.ListenFor(pmes.Id)
pmes.Key = &str_key
pmes.Id = &mes_id
mes := new(swarm.Message)
mes.Data = []byte(pmes.String())
mes.Peer = p
response_chan := s.ListenFor(*pmes.Id) mes := swarm.NewMessage(p, pmes.ToProtobuf())
s.network.Chan.Outgoing <- mes
// Wait for either the response or a timeout // Wait for either the response or a timeout
timeup := time.After(timeout) timeup := time.After(timeout)
...@@ -68,7 +67,12 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { ...@@ -68,7 +67,12 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
// TODO: unregister listener // TODO: unregister listener
return nil, u.ErrTimeout return nil, u.ErrTimeout
case resp := <-response_chan: case resp := <-response_chan:
return resp.Data, nil pmes_out := new(DHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil,err
}
return pmes_out.GetValue(), nil
} }
} }
......
...@@ -49,7 +49,7 @@ func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer { ...@@ -49,7 +49,7 @@ func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer {
new_bucket := bucket.Split(b_id, rt.local) new_bucket := bucket.Split(b_id, rt.local)
rt.Buckets = append(rt.Buckets, new_bucket) rt.Buckets = append(rt.Buckets, new_bucket)
if new_bucket.Len() > rt.bucketsize { if new_bucket.Len() > rt.bucketsize {
// This is another very rare and annoying case // TODO: This is a very rare and annoying case
panic("Case not handled.") panic("Case not handled.")
} }
...@@ -87,10 +87,27 @@ func (p peerSorterArr) Less(a, b int) bool { ...@@ -87,10 +87,27 @@ func (p peerSorterArr) Less(a, b int) bool {
} }
// //
func (rt *RoutingTable) copyPeersFromList(peerArr peerSorterArr, peerList *list.List) peerSorterArr {
for e := peerList.Front(); e != nil; e = e.Next() {
p := e.Value.(*peer.Peer)
p_id := convertPeerID(p.ID)
pd := peerDistance{
p: p,
distance: xor(rt.local, p_id),
}
peerArr = append(peerArr, &pd)
}
return peerArr
}
// Returns a single peer that is nearest to the given ID // Returns a single peer that is nearest to the given ID
func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer { func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer {
peers := rt.NearestPeers(id, 1) peers := rt.NearestPeers(id, 1)
return peers[0] if len(peers) > 0 {
return peers[0]
} else {
return nil
}
} }
// Returns a list of the 'count' closest peers to the given ID // Returns a list of the 'count' closest peers to the given ID
...@@ -100,26 +117,26 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { ...@@ -100,26 +117,26 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
// Get bucket at cpl index or last bucket // Get bucket at cpl index or last bucket
var bucket *Bucket var bucket *Bucket
if cpl >= len(rt.Buckets) { if cpl >= len(rt.Buckets) {
bucket = rt.Buckets[len(rt.Buckets) - 1] cpl = len(rt.Buckets) - 1
} else {
bucket = rt.Buckets[cpl]
} }
bucket = rt.Buckets[cpl]
var peerArr peerSorterArr
if bucket.Len() == 0 { if bucket.Len() == 0 {
// This can happen, very rarely. // In the case of an unusual split, one bucket may be empty.
panic("Case not yet implemented.") // if this happens, search both surrounding buckets for nearest peer
} if cpl > 0 {
plist := (*list.List)(rt.Buckets[cpl - 1])
peerArr = rt.copyPeersFromList(peerArr, plist)
}
var peerArr peerSorterArr if cpl < len(rt.Buckets) - 1 {
plist := (*list.List)(bucket) plist := (*list.List)(rt.Buckets[cpl + 1])
for e := plist.Front();e != nil; e = e.Next() { peerArr = rt.copyPeersFromList(peerArr, plist)
p := e.Value.(*peer.Peer)
p_id := convertPeerID(p.ID)
pd := peerDistance{
p: p,
distance: xor(rt.local, p_id),
} }
peerArr = append(peerArr, &pd) } else {
plist := (*list.List)(bucket)
peerArr = rt.copyPeersFromList(peerArr, plist)
} }
// Sort by distance to local peer // Sort by distance to local peer
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment