Commit 01ca93b4 authored by Jeromy's avatar Jeromy

fixed small bug introduced during race condition frustration

parent c22b6aa3
......@@ -3,6 +3,7 @@ package dht
import (
"sync"
"time"
"bytes"
"encoding/json"
peer "github.com/jbenet/go-ipfs/peer"
......@@ -22,7 +23,7 @@ import (
// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
routes *RoutingTable
routes []*RoutingTable
network *swarm.Swarm
......@@ -38,7 +39,7 @@ type IpfsDHT struct {
providerLock sync.RWMutex
// map of channels waiting for reply messages
listeners map[uint64]chan *swarm.Message
listeners map[uint64]*listenInfo
listenLock sync.RWMutex
// Signal to shutdown dht
......@@ -46,6 +47,14 @@ type IpfsDHT struct {
// When this peer started up
birth time.Time
//lock to make diagnostics work better
diaglock sync.Mutex
}
type listenInfo struct {
resp chan *swarm.Message
count int
}
// Create a new DHT object with the given peer as the 'local' host
......@@ -63,10 +72,11 @@ func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
dht.network = network
dht.datastore = ds.NewMapDatastore()
dht.self = p
dht.listeners = make(map[uint64]chan *swarm.Message)
dht.listeners = make(map[uint64]*listenInfo)
dht.providers = make(map[u.Key][]*providerInfo)
dht.shutdown = make(chan struct{})
dht.routes = NewRoutingTable(20, convertPeerID(p.ID))
dht.routes = make([]*RoutingTable, 1)
dht.routes[0] = NewRoutingTable(20, convertPeerID(p.ID))
dht.birth = time.Now()
return dht, nil
}
......@@ -106,7 +116,7 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
dht.network.StartConn(conn)
removed := dht.routes.Update(peer)
removed := dht.routes[0].Update(peer)
if removed != nil {
panic("need to remove this peer.")
}
......@@ -142,7 +152,7 @@ func (dht *IpfsDHT) handleMessages() {
}
// Update peers latest visit in routing table
removed := dht.routes.Update(mes.Peer)
removed := dht.routes[0].Update(mes.Peer)
if removed != nil {
panic("Need to handle removed peer.")
}
......@@ -150,10 +160,15 @@ func (dht *IpfsDHT) handleMessages() {
// Note: not sure if this is the correct place for this
if pmes.GetResponse() {
dht.listenLock.RLock()
ch, ok := dht.listeners[pmes.GetId()]
list, ok := dht.listeners[pmes.GetId()]
if list.count > 1 {
list.count--
} else if list.count == 1 {
delete(dht.listeners, pmes.GetId())
}
dht.listenLock.RUnlock()
if ok {
ch <- mes
list.resp <- mes
} else {
// this is expected behaviour during a timeout
u.DOut("Received response with nobody listening...")
......@@ -181,7 +196,7 @@ func (dht *IpfsDHT) handleMessages() {
case DHTMessage_PING:
dht.handlePing(mes.Peer, pmes)
case DHTMessage_DIAGNOSTIC:
// TODO: network diagnostic messages
dht.handleDiagnostic(mes.Peer, pmes)
}
case err := <-dht.network.Chan.Errors:
......@@ -220,7 +235,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
}
} else if err == ds.ErrNotFound {
// Find closest peer(s) to desired key and reply with that info
closer := dht.routes.NearestPeer(convertKey(u.Key(pmes.GetKey())))
closer := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey())))
resp = &pDHTMessage{
Response: true,
Id: *pmes.Id,
......@@ -256,7 +271,7 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *DHTMessage) {
u.POut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty())
closest := dht.routes.NearestPeer(convertKey(u.Key(pmes.GetKey())))
closest := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey())))
if closest == nil {
panic("could not find anything.")
}
......@@ -336,10 +351,10 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
func (dht *IpfsDHT) ListenFor(mesid uint64, count int) <-chan *swarm.Message {
lchan := make(chan *swarm.Message)
dht.listenLock.Lock()
dht.listeners[mesid] = lchan
dht.listeners[mesid] = &listenInfo{lchan, count}
dht.listenLock.Unlock()
return lchan
}
......@@ -347,12 +362,19 @@ func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
// Unregister the given message id from the listener map
func (dht *IpfsDHT) Unlisten(mesid uint64) {
dht.listenLock.Lock()
ch, ok := dht.listeners[mesid]
list, ok := dht.listeners[mesid]
if ok {
delete(dht.listeners, mesid)
}
dht.listenLock.Unlock()
close(ch)
close(list.resp)
}
func (dht *IpfsDHT) IsListening(mesid uint64) bool {
dht.listenLock.RLock()
_,ok := dht.listeners[mesid]
dht.listenLock.RUnlock()
return ok
}
// Stop all communications from this peer and shut down
......@@ -368,3 +390,51 @@ func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
dht.providers[key] = append(provs, &providerInfo{time.Now(), p})
dht.providerLock.Unlock()
}
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
dht.diaglock.Lock()
if dht.IsListening(pmes.GetId()) {
//TODO: ehhh..........
dht.diaglock.Unlock()
return
}
dht.diaglock.Unlock()
seq := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10)
listen_chan := dht.ListenFor(pmes.GetId(), len(seq))
for _,ps := range seq {
mes := swarm.NewMessage(ps, pmes)
dht.network.Chan.Outgoing <-mes
}
buf := new(bytes.Buffer)
// NOTE: this shouldnt be a hardcoded value
after := time.After(time.Second * 20)
count := len(seq)
for count > 0 {
select {
case <-after:
//Timeout, return what we have
goto out
case req_resp := <-listen_chan:
buf.Write(req_resp.Data)
count--
}
}
out:
di := dht.getDiagInfo()
buf.Write(di.Marshal())
resp := pDHTMessage{
Type: DHTMessage_DIAGNOSTIC,
Id: pmes.GetId(),
Value: buf.Bytes(),
Response: true,
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <-mes
}
......@@ -3,6 +3,7 @@ package dht
import (
"math/rand"
"time"
"bytes"
"encoding/json"
proto "code.google.com/p/goprotobuf/proto"
......@@ -30,7 +31,7 @@ func GenerateMessageID() uint64 {
// PutValue adds value corresponding to given Key.
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
var p *peer.Peer
p = s.routes.NearestPeer(convertKey(key))
p = s.routes[0].NearestPeer(convertKey(key))
if p == nil {
panic("Table returned nil peer!")
}
......@@ -52,7 +53,7 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
// returned along with util.ErrSearchIncomplete
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
var p *peer.Peer
p = s.routes.NearestPeer(convertKey(key))
p = s.routes[0].NearestPeer(convertKey(key))
if p == nil {
panic("Table returned nil peer!")
}
......@@ -62,7 +63,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
Key: string(key),
Id: GenerateMessageID(),
}
response_chan := s.ListenFor(pmes.Id)
response_chan := s.ListenFor(pmes.Id, 1)
mes := swarm.NewMessage(p, pmes.ToProtobuf())
s.network.Chan.Outgoing <- mes
......@@ -92,7 +93,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
// Announce that this node can provide value for given key
func (s *IpfsDHT) Provide(key u.Key) error {
peers := s.routes.NearestPeers(convertKey(key), PoolSize)
peers := s.routes[0].NearestPeers(convertKey(key), PoolSize)
if len(peers) == 0 {
//return an error
}
......@@ -112,7 +113,7 @@ func (s *IpfsDHT) Provide(key u.Key) error {
// FindProviders searches for peers who can provide the value for given key.
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
p := s.routes.NearestPeer(convertKey(key))
p := s.routes[0].NearestPeer(convertKey(key))
pmes := pDHTMessage{
Type: DHTMessage_GET_PROVIDERS,
......@@ -122,7 +123,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listen_chan := s.ListenFor(pmes.Id)
listen_chan := s.ListenFor(pmes.Id, 1)
u.DOut("Find providers for: '%s'", key)
s.network.Chan.Outgoing <-mes
after := time.After(timeout)
......@@ -163,7 +164,6 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
}
return prov_arr, nil
}
}
......@@ -171,7 +171,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
// FindPeer searches for a peer with given ID.
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
p := s.routes.NearestPeer(convertPeerID(id))
p := s.routes[0].NearestPeer(convertPeerID(id))
pmes := pDHTMessage{
Type: DHTMessage_FIND_NODE,
......@@ -181,7 +181,7 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listen_chan := s.ListenFor(pmes.Id)
listen_chan := s.ListenFor(pmes.Id, 1)
s.network.Chan.Outgoing <-mes
after := time.After(timeout)
select {
......@@ -224,7 +224,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
mes := swarm.NewMessage(p, pmes.ToProtobuf())
before := time.Now()
response_chan := dht.ListenFor(pmes.Id)
response_chan := dht.ListenFor(pmes.Id, 1)
dht.network.Chan.Outgoing <- mes
tout := time.After(timeout)
......@@ -241,3 +241,54 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
return u.ErrTimeout
}
}
func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
u.DOut("Begin Diagnostic")
//Send to N closest peers
targets := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10)
// TODO: Add timeout to this struct so nodes know when to return
pmes := pDHTMessage{
Type: DHTMessage_DIAGNOSTIC,
Id: GenerateMessageID(),
}
listen_chan := dht.ListenFor(pmes.Id, len(targets))
pbmes := pmes.ToProtobuf()
for _,p := range targets {
mes := swarm.NewMessage(p, pbmes)
dht.network.Chan.Outgoing <-mes
}
var out []*diagInfo
after := time.After(timeout)
for count := len(targets); count > 0; {
select {
case <-after:
u.DOut("Diagnostic request timed out.")
return out, u.ErrTimeout
case resp := <-listen_chan:
pmes_out := new(DHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
// NOTE: here and elsewhere, need to audit error handling,
// some errors should be continued on from
return out, err
}
dec := json.NewDecoder(bytes.NewBuffer(pmes_out.GetValue()))
for {
di := new(diagInfo)
err := dec.Decode(di)
if err != nil {
break
}
out = append(out, di)
}
}
}
return nil,nil
}
......@@ -95,11 +95,7 @@ func (p peerSorterArr) Less(a, b int) bool {
//
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
if peerList == nil {
return peerSorterArr{}
}
e := peerList.Front()
for ; e != nil; {
for e := peerList.Front(); e != nil; e = e.Next() {
p := e.Value.(*peer.Peer)
p_id := convertPeerID(p.ID)
pd := peerDistance{
......@@ -107,11 +103,10 @@ func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) pe
distance: xor(target, p_id),
}
peerArr = append(peerArr, &pd)
if e != nil {
if e == nil {
u.POut("list element was nil.")
return peerArr
}
e = e.Next()
}
return peerArr
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment