Commit 18b9efc9 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

providers interface is coming along nicely

parent 1082edeb
...@@ -61,6 +61,7 @@ func NewDHT(p *peer.Peer) (*IpfsDHT, error) { ...@@ -61,6 +61,7 @@ func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
dht.datastore = ds.NewMapDatastore() dht.datastore = ds.NewMapDatastore()
dht.self = p dht.self = p
dht.listeners = make(map[uint64]chan *swarm.Message) dht.listeners = make(map[uint64]chan *swarm.Message)
dht.providers = make(map[u.Key][]*peer.Peer)
dht.shutdown = make(chan struct{}) dht.shutdown = make(chan struct{})
dht.routes = NewRoutingTable(20, convertPeerID(p.ID)) dht.routes = NewRoutingTable(20, convertPeerID(p.ID))
return dht, nil return dht, nil
...@@ -101,9 +102,11 @@ func (dht *IpfsDHT) handleMessages() { ...@@ -101,9 +102,11 @@ func (dht *IpfsDHT) handleMessages() {
u.DOut("Begin message handling routine") u.DOut("Begin message handling routine")
for { for {
select { select {
case mes := <-dht.network.Chan.Incoming: case mes,ok := <-dht.network.Chan.Incoming:
u.DOut("recieved message from swarm.") if !ok {
u.DOut("handleMessages closing, bad recv on incoming")
return
}
pmes := new(DHTMessage) pmes := new(DHTMessage)
err := proto.Unmarshal(mes.Data, pmes) err := proto.Unmarshal(mes.Data, pmes)
if err != nil { if err != nil {
...@@ -121,15 +124,16 @@ func (dht *IpfsDHT) handleMessages() { ...@@ -121,15 +124,16 @@ func (dht *IpfsDHT) handleMessages() {
dht.listenLock.RUnlock() dht.listenLock.RUnlock()
if ok { if ok {
ch <- mes ch <- mes
} else {
// this is expected behaviour during a timeout
u.DOut("Received response with nobody listening...")
} }
// this is expected behaviour during a timeout
u.DOut("Received response with nobody listening...")
continue continue
} }
// //
u.DOut("Got message type: %d", pmes.GetType()) u.DOut("Got message type: '%s' [id = %x]", mesNames[pmes.GetType()], pmes.GetId())
switch pmes.GetType() { switch pmes.GetType() {
case DHTMessage_GET_VALUE: case DHTMessage_GET_VALUE:
dht.handleGetValue(mes.Peer, pmes) dht.handleGetValue(mes.Peer, pmes)
...@@ -138,13 +142,15 @@ func (dht *IpfsDHT) handleMessages() { ...@@ -138,13 +142,15 @@ func (dht *IpfsDHT) handleMessages() {
case DHTMessage_FIND_NODE: case DHTMessage_FIND_NODE:
dht.handleFindNode(mes.Peer, pmes) dht.handleFindNode(mes.Peer, pmes)
case DHTMessage_ADD_PROVIDER: case DHTMessage_ADD_PROVIDER:
dht.handleAddProvider(mes.Peer, pmes)
case DHTMessage_GET_PROVIDERS: case DHTMessage_GET_PROVIDERS:
dht.handleGetProviders(mes.Peer, pmes)
case DHTMessage_PING: case DHTMessage_PING:
dht.handlePing(mes.Peer, pmes) dht.handlePing(mes.Peer, pmes)
} }
case err := <-dht.network.Chan.Errors: case err := <-dht.network.Chan.Errors:
panic(err) u.DErr("dht err: %s", err)
case <-dht.shutdown: case <-dht.shutdown:
return return
} }
...@@ -197,13 +203,16 @@ func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) { ...@@ -197,13 +203,16 @@ func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
} }
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) { func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
dht.providerLock.RLock()
providers := dht.providers[u.Key(pmes.GetKey())] providers := dht.providers[u.Key(pmes.GetKey())]
dht.providerLock.RUnlock()
if providers == nil || len(providers) == 0 { if providers == nil || len(providers) == 0 {
// ????? // ?????
u.DOut("No known providers for requested key.")
} }
// This is just a quick hack, formalize method of sending addrs later // This is just a quick hack, formalize method of sending addrs later
var addrs []string addrs := make(map[u.Key]string)
for _,prov := range providers { for _,prov := range providers {
ma := prov.NetAddress("tcp") ma := prov.NetAddress("tcp")
str,err := ma.String() str,err := ma.String()
...@@ -212,7 +221,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) { ...@@ -212,7 +221,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
continue continue
} }
addrs = append(addrs, str) addrs[prov.Key()] = str
} }
data,err := json.Marshal(addrs) data,err := json.Marshal(addrs)
...@@ -225,6 +234,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) { ...@@ -225,6 +234,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
Key: pmes.GetKey(), Key: pmes.GetKey(),
Value: data, Value: data,
Id: pmes.GetId(), Id: pmes.GetId(),
Response: true,
} }
mes := swarm.NewMessage(p, resp.ToProtobuf()) mes := swarm.NewMessage(p, resp.ToProtobuf())
...@@ -234,8 +244,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) { ...@@ -234,8 +244,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) { func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
//TODO: need to implement TTLs on providers //TODO: need to implement TTLs on providers
key := u.Key(pmes.GetKey()) key := u.Key(pmes.GetKey())
parr := dht.providers[key] dht.addProviderEntry(key, p)
dht.providers[key] = append(parr, p)
} }
...@@ -290,3 +299,11 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { ...@@ -290,3 +299,11 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
return u.ErrTimeout return u.ErrTimeout
} }
} }
func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
u.DOut("Adding %s as provider for '%s'", p.Key().Pretty(), key)
dht.providerLock.Lock()
provs := dht.providers[key]
dht.providers[key] = append(provs, p)
dht.providerLock.Unlock()
}
...@@ -9,6 +9,17 @@ type pDHTMessage struct { ...@@ -9,6 +9,17 @@ type pDHTMessage struct {
Id uint64 Id uint64
} }
var mesNames [10]string
func init() {
mesNames[DHTMessage_ADD_PROVIDER] = "add provider"
mesNames[DHTMessage_FIND_NODE] = "find node"
mesNames[DHTMessage_GET_PROVIDERS] = "get providers"
mesNames[DHTMessage_GET_VALUE] = "get value"
mesNames[DHTMessage_PUT_VALUE] = "put value"
mesNames[DHTMessage_PING] = "ping"
}
func (m *pDHTMessage) ToProtobuf() *DHTMessage { func (m *pDHTMessage) ToProtobuf() *DHTMessage {
pmes := new(DHTMessage) pmes := new(DHTMessage)
if m.Value != nil { if m.Value != nil {
......
...@@ -19,7 +19,8 @@ var PoolSize = 6 ...@@ -19,7 +19,8 @@ var PoolSize = 6
// TODO: determine a way of creating and managing message IDs // TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 { func GenerateMessageID() uint64 {
return uint64(rand.Uint32()) << 32 & uint64(rand.Uint32()) //return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32())
return uint64(rand.Uint32())
} }
// This file implements the Routing interface for the IpfsDHT struct. // This file implements the Routing interface for the IpfsDHT struct.
...@@ -116,6 +117,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, ...@@ -116,6 +117,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
mes := swarm.NewMessage(p, pmes.ToProtobuf()) mes := swarm.NewMessage(p, pmes.ToProtobuf())
listen_chan := s.ListenFor(pmes.Id) listen_chan := s.ListenFor(pmes.Id)
u.DOut("Find providers for: '%s'", key)
s.network.Chan.Outgoing <-mes s.network.Chan.Outgoing <-mes
after := time.After(timeout) after := time.After(timeout)
select { select {
...@@ -123,37 +125,39 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, ...@@ -123,37 +125,39 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
s.Unlisten(pmes.Id) s.Unlisten(pmes.Id)
return nil, u.ErrTimeout return nil, u.ErrTimeout
case resp := <-listen_chan: case resp := <-listen_chan:
u.DOut("FindProviders: got response.")
pmes_out := new(DHTMessage) pmes_out := new(DHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out) err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var addrs map[string]string var addrs map[u.Key]string
err := json.Unmarshal(pmes_out.GetValue(), &addrs) err = json.Unmarshal(pmes_out.GetValue(), &addrs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for key,addr := range addrs { var prov_arr []*peer.Peer
p := s.network.Find(u.Key(key)) for pid,addr := range addrs {
p := s.network.Find(pid)
if p == nil { if p == nil {
maddr,err := ma.NewMultiaddr(addr) maddr,err := ma.NewMultiaddr(addr)
if err != nil { if err != nil {
u.PErr("error connecting to new peer: %s", err) u.PErr("error connecting to new peer: %s", err)
continue continue
} }
p, err := s.Connect(maddr) p, err = s.Connect(maddr)
if err != nil { if err != nil {
u.PErr("error connecting to new peer: %s", err) u.PErr("error connecting to new peer: %s", err)
continue continue
} }
} }
s.providerLock.Lock() s.addProviderEntry(key, p)
prov_arr := s.providers[key] prov_arr = append(prov_arr, p)
s.providers[key] = append(prov_arr, p)
s.providerLock.Unlock()
} }
return prov_arr, nil
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment