Commit 248e06f7 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

working towards Providers implementation

parent a85ce3fa
...@@ -3,6 +3,7 @@ package dht ...@@ -3,6 +3,7 @@ package dht
import ( import (
"sync" "sync"
"time" "time"
"encoding/json"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm" swarm "github.com/jbenet/go-ipfs/swarm"
...@@ -31,6 +32,10 @@ type IpfsDHT struct { ...@@ -31,6 +32,10 @@ type IpfsDHT struct {
// Local data // Local data
datastore ds.Datastore datastore ds.Datastore
// Map keys to peers that can provide their value
// TODO: implement a TTL on each of these keys
providers map[u.Key][]*peer.Peer
// map of channels waiting for reply messages // map of channels waiting for reply messages
listeners map[uint64]chan *swarm.Message listeners map[uint64]chan *swarm.Message
listenLock sync.RWMutex listenLock sync.RWMutex
...@@ -185,11 +190,44 @@ func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) { ...@@ -185,11 +190,44 @@ func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
} }
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) { func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
panic("Not implemented.") providers := dht.providers[u.Key(pmes.GetKey())]
if providers == nil || len(providers) == 0 {
// ?????
}
var addrs []string
for _,prov := range providers {
ma := prov.NetAddress("tcp")
str,err := ma.String()
if err != nil {
u.PErr("Error: %s", err)
continue
}
addrs = append(addrs, str)
}
data,err := json.Marshal(addrs)
if err != nil {
panic(err)
}
resp := pDHTMessage{
Type: DHTMessage_GET_PROVIDERS,
Key: pmes.GetKey(),
Value: data,
Id: pmes.GetId(),
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <-mes
} }
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) { func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
panic("Not implemented.") //TODO: need to implement TTLs on providers
key := u.Key(pmes.GetKey())
parr := dht.providers[key]
dht.providers[key] = append(parr, p)
} }
......
...@@ -11,6 +11,9 @@ import ( ...@@ -11,6 +11,9 @@ import (
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
// Pool size is the number of nodes used for group find/set RPC calls
var PoolSize = 6
// TODO: determine a way of creating and managing message IDs // TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 { func GenerateMessageID() uint64 {
return uint64(rand.Uint32()) << 32 & uint64(rand.Uint32()) return uint64(rand.Uint32()) << 32 & uint64(rand.Uint32())
...@@ -25,8 +28,6 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error { ...@@ -25,8 +28,6 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
var p *peer.Peer var p *peer.Peer
p = s.routes.NearestPeer(convertKey(key)) p = s.routes.NearestPeer(convertKey(key))
if p == nil { if p == nil {
u.POut("nbuckets: %d", len(s.routes.Buckets))
u.POut("%d", s.routes.Buckets[0].Len())
panic("Table returned nil peer!") panic("Table returned nil peer!")
} }
...@@ -64,7 +65,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { ...@@ -64,7 +65,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
timeup := time.After(timeout) timeup := time.After(timeout)
select { select {
case <-timeup: case <-timeup:
// TODO: unregister listener s.Unlisten(pmes.Id)
return nil, u.ErrTimeout return nil, u.ErrTimeout
case resp := <-response_chan: case resp := <-response_chan:
pmes_out := new(DHTMessage) pmes_out := new(DHTMessage)
...@@ -81,17 +82,80 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { ...@@ -81,17 +82,80 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
// Announce that this node can provide value for given key // Announce that this node can provide value for given key
func (s *IpfsDHT) Provide(key u.Key) error { func (s *IpfsDHT) Provide(key u.Key) error {
return u.ErrNotImplemented peers := s.routes.NearestPeers(convertKey(key), PoolSize)
if len(peers) == 0 {
//return an error
}
pmes := pDHTMessage{
Type: DHTMessage_ADD_PROVIDER,
Key: string(key),
}
pbmes := pmes.ToProtobuf()
for _,p := range peers {
mes := swarm.NewMessage(p, pbmes)
s.network.Chan.Outgoing <-mes
}
return nil
} }
// FindProviders searches for peers who can provide the value for given key. // FindProviders searches for peers who can provide the value for given key.
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) (*peer.Peer, error) { func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
return nil, u.ErrNotImplemented p := s.routes.NearestPeer(convertKey(key))
pmes := pDHTMessage{
Type: DHTMessage_GET_PROVIDERS,
Key: string(key),
Id: GenerateMessageID(),
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listen_chan := s.ListenFor(pmes.Id)
s.network.Chan.Outgoing <-mes
after := time.After(timeout)
select {
case <-after:
s.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp := <-listen_chan:
pmes_out := new(DHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil, err
}
panic("Not yet implemented.")
}
} }
// Find specific Peer // Find specific Peer
// FindPeer searches for a peer with given ID. // FindPeer searches for a peer with given ID.
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) { func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
return nil, u.ErrNotImplemented p := s.routes.NearestPeer(convertPeerID(id))
pmes := pDHTMessage{
Type: DHTMessage_FIND_NODE,
Key: string(id),
Id: GenerateMessageID(),
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listen_chan := s.ListenFor(pmes.Id)
s.network.Chan.Outgoing <-mes
after := time.After(timeout)
select {
case <-after:
s.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp := <-listen_chan:
pmes_out := new(DHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil, err
}
panic("Not yet implemented.")
}
} }
...@@ -25,7 +25,7 @@ type IpfsRouting interface { ...@@ -25,7 +25,7 @@ type IpfsRouting interface {
Provide(key u.Key) error Provide(key u.Key) error
// FindProviders searches for peers who can provide the value for given key. // FindProviders searches for peers who can provide the value for given key.
FindProviders(key u.Key, timeout time.Duration) (*peer.Peer, error) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error)
// Find specific Peer // Find specific Peer
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment