Commit 334ea873 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

fix a few race conditions and add in newlines to print statements

parent 320f5272
......@@ -34,6 +34,7 @@ type IpfsDHT struct {
// Local data
datastore ds.Datastore
dslock sync.Mutex
// Map keys to peers that can provide their value
providers map[u.Key][]*providerInfo
......@@ -78,7 +79,7 @@ func (dht *IpfsDHT) Start() {
// Connect to a new peer at the given address, ping and add to the routing table
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
maddrstr, _ := addr.String()
u.DOut("Connect to new peer: %s", maddrstr)
u.DOut("Connect to new peer: %s\n", maddrstr)
npeer, err := dht.network.ConnectNew(addr)
if err != nil {
return nil, err
......@@ -113,7 +114,7 @@ func (dht *IpfsDHT) handleMessages() {
pmes := new(PBDHTMessage)
err := proto.Unmarshal(mes.Data, pmes)
if err != nil {
u.PErr("Failed to decode protobuf message: %s", err)
u.PErr("Failed to decode protobuf message: %s\n", err)
continue
}
......@@ -126,7 +127,7 @@ func (dht *IpfsDHT) handleMessages() {
}
//
u.DOut("[peer: %s]\nGot message type: '%s' [id = %x, from = %s]",
u.DOut("[peer: %s]\nGot message type: '%s' [id = %x, from = %s]\n",
dht.self.ID.Pretty(),
PBDHTMessage_MessageType_name[int32(pmes.GetType())],
pmes.GetId(), mes.Peer.ID.Pretty())
......@@ -148,7 +149,7 @@ func (dht *IpfsDHT) handleMessages() {
}
case err := <-ch.Errors:
u.PErr("dht err: %s", err)
u.PErr("dht err: %s\n", err)
case <-dht.shutdown:
checkTimeouts.Stop()
return
......@@ -187,7 +188,7 @@ func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) er
}
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
u.DOut("handleGetValue for key: %s", pmes.GetKey())
u.DOut("handleGetValue for key: %s\n", pmes.GetKey())
dskey := ds.NewKey(pmes.GetKey())
resp := &Message{
Response: true,
......@@ -201,9 +202,11 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
resp.Value = iVal.([]byte)
} else if err == ds.ErrNotFound {
// Check if we know any providers for the requested value
dht.providerLock.RLock()
provs, ok := dht.providers[u.Key(pmes.GetKey())]
dht.providerLock.RUnlock()
if ok && len(provs) > 0 {
u.DOut("handleGetValue returning %d provider[s]", len(provs))
u.DOut("handleGetValue returning %d provider[s]\n", len(provs))
for _, prov := range provs {
resp.Peers = append(resp.Peers, prov.Value)
}
......@@ -219,7 +222,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
} else {
level = int(pmes.GetValue()[0]) // Using value field to specify cluster level
}
u.DOut("handleGetValue searching level %d clusters", level)
u.DOut("handleGetValue searching level %d clusters\n", level)
closer := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
......@@ -233,7 +236,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
resp.Peers = nil
u.DOut("handleGetValue could not find a closer node than myself.")
} else {
u.DOut("handleGetValue returning a closer peer: '%s'", closer.ID.Pretty())
u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty())
resp.Peers = []*peer.Peer{closer}
}
}
......@@ -249,6 +252,8 @@ out:
// Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *PBDHTMessage) {
dht.dslock.Lock()
defer dht.dslock.Unlock()
dskey := ds.NewKey(pmes.GetKey())
err := dht.datastore.Put(dskey, pmes.GetValue())
if err != nil {
......@@ -278,7 +283,7 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
dht.network.Send(mes)
}()
level := pmes.GetValue()[0]
u.DOut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty())
u.DOut("handleFindPeer: searching for '%s'\n", peer.ID(pmes.GetKey()).Pretty())
closest := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
if closest == nil {
u.PErr("handleFindPeer: could not find anything.")
......@@ -295,7 +300,7 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
return
}
u.DOut("handleFindPeer: sending back '%s'", closest.ID.Pretty())
u.DOut("handleFindPeer: sending back '%s'\n", closest.ID.Pretty())
resp.Peers = []*peer.Peer{closest}
resp.Success = true
}
......@@ -352,7 +357,7 @@ func (dht *IpfsDHT) Halt() {
}
func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
u.DOut("Adding %s as provider for '%s'", p.Key().Pretty(), key)
u.DOut("Adding %s as provider for '%s'\n", p.Key().Pretty(), key)
dht.providerLock.Lock()
provs := dht.providers[key]
dht.providers[key] = append(provs, &providerInfo{time.Now(), p})
......@@ -432,13 +437,13 @@ func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Durati
}
addr, err := ma.NewMultiaddr(pb.GetAddr())
if err != nil {
u.PErr(err.Error())
u.PErr("%v\n", err.Error())
continue
}
np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr)
if err != nil {
u.PErr(err.Error())
u.PErr("%v\n", err.Error())
continue
}
......@@ -494,19 +499,19 @@ func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
if p == nil {
maddr, err := ma.NewMultiaddr(pinfo.GetAddr())
if err != nil {
u.PErr("getValue error: %s", err)
u.PErr("getValue error: %s\n", err)
continue
}
p, err = dht.network.GetConnection(peer.ID(pinfo.GetId()), maddr)
if err != nil {
u.PErr("getValue error: %s", err)
u.PErr("getValue error: %s\n", err)
continue
}
}
pmes, err := dht.getValueSingle(p, key, timeout, level)
if err != nil {
u.DErr("getFromPeers error: %s", err)
u.DErr("getFromPeers error: %s\n", err)
continue
}
dht.addProviderEntry(key, p)
......@@ -520,6 +525,8 @@ func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
}
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
dht.dslock.Lock()
defer dht.dslock.Unlock()
v, err := dht.datastore.Get(ds.NewKey(string(key)))
if err != nil {
return nil, err
......@@ -637,15 +644,15 @@ func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer
// Dont add someone who is already on the list
p := dht.network.Find(u.Key(prov.GetId()))
if p == nil {
u.DOut("given provider %s was not in our network already.", peer.ID(prov.GetId()).Pretty())
u.DOut("given provider %s was not in our network already.\n", peer.ID(prov.GetId()).Pretty())
maddr, err := ma.NewMultiaddr(prov.GetAddr())
if err != nil {
u.PErr("error connecting to new peer: %s", err)
u.PErr("error connecting to new peer: %s\n", err)
continue
}
p, err = dht.network.GetConnection(peer.ID(prov.GetId()), maddr)
if err != nil {
u.PErr("error connecting to new peer: %s", err)
u.PErr("error connecting to new peer: %s\n", err)
continue
}
}
......
......@@ -30,8 +30,7 @@ var AlphaValue = 3
// GenerateMessageID creates and returns a new message ID
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
//return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32())
return uint64(rand.Uint32())
return (uint64(rand.Uint32()) << 32) | uint64(rand.Uint32())
}
// This file implements the Routing interface for the IpfsDHT struct.
......@@ -188,7 +187,7 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
}
val, peers, err := dht.getValueOrPeers(p, key, timeout/4, routeLevel)
if err != nil {
u.DErr(err.Error())
u.DErr("%v\n", err.Error())
c.Decrement()
continue
}
......@@ -254,7 +253,7 @@ func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Pee
ll.EndLog()
ll.Print()
}()
u.DOut("Find providers for: '%s'", key)
u.DOut("Find providers for: '%s'\n", key)
p := dht.routingTables[0].NearestPeer(kb.ConvertKey(key))
if p == nil {
return nil, kb.ErrLookupFailure
......@@ -288,7 +287,7 @@ func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Pee
np, err := dht.network.GetConnection(peer.ID(closer[0].GetId()), maddr)
if err != nil {
u.PErr("[%s] Failed to connect to: %s", dht.self.ID.Pretty(), closer[0].GetAddr())
u.PErr("[%s] Failed to connect to: %s\n", dht.self.ID.Pretty(), closer[0].GetAddr())
level++
continue
}
......@@ -361,7 +360,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
case <-responseChan:
roundtrip := time.Since(before)
p.SetLatency(roundtrip)
u.DOut("Ping took %s.", roundtrip.String())
u.DOut("Ping took %s.\n", roundtrip.String())
return nil
case <-tout:
// Timed out, think about removing peer from network
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment