Commit 9dbba5c6 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

some dht cleanup, and make DHTs take a master context

parent 7b99fde6
......@@ -53,16 +53,19 @@ type IpfsDHT struct {
//lock to make diagnostics work better
diaglock sync.Mutex
ctx context.Context
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
dht := new(IpfsDHT)
dht.network = net
dht.sender = sender
dht.datastore = dstore
dht.self = p
dht.peerstore = ps
dht.ctx = ctx
dht.providers = NewProviderManager(p.ID)
......@@ -71,6 +74,8 @@ func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sende
dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*1000)
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
dht.birth = time.Now()
go dht.PingRoutine(time.Second * 10)
return dht
}
......@@ -137,7 +142,6 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
// get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType())
if handler == nil {
// TODO handle/log err
log.Error("got back nil handler from handlerForMsgType")
return nil
}
......@@ -350,7 +354,7 @@ func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
byt, ok := v.([]byte)
if !ok {
return byt, errors.New("value stored in datastore not []byte")
return nil, errors.New("value stored in datastore not []byte")
}
return byt, nil
}
......@@ -533,6 +537,27 @@ func (dht *IpfsDHT) loadProvidableKeys() error {
return nil
}
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
tick := time.Tick(t)
for {
select {
case <-tick:
id := make([]byte, 16)
rand.Read(id)
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5)
for _, p := range peers {
ctx, _ := context.WithTimeout(dht.ctx, time.Second*5)
err := dht.Ping(ctx, p)
if err != nil {
log.Error("Ping error: %s", err)
}
}
case <-dht.ctx.Done():
return
}
}
}
// Bootstrap builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) Bootstrap(ctx context.Context) {
id := make([]byte, 16)
......
......@@ -2,6 +2,7 @@ package dht
import (
"encoding/json"
"fmt"
"time"
)
......@@ -29,12 +30,16 @@ func (l *logDhtRPC) EndLog() {
func (l *logDhtRPC) Print() {
b, err := json.Marshal(l)
if err != nil {
log.Debug(err.Error())
log.Debug("Error marshaling logDhtRPC object: %s", err)
} else {
log.Debug(string(b))
}
}
func (l *logDhtRPC) String() string {
return fmt.Sprintf("DHT RPC: %s took %s, success = %s", l.Type, l.Duration, l.Success)
}
func (l *logDhtRPC) EndAndPrint() {
l.EndLog()
l.Print()
......
......@@ -5,9 +5,7 @@ import (
"fmt"
"time"
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
......@@ -32,8 +30,6 @@ func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
return dht.handleGetProviders
case Message_PING:
return dht.handlePing
case Message_DIAGNOSTIC:
return dht.handleDiagnostic
default:
return nil
}
......@@ -211,53 +207,3 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, er
func (dht *IpfsDHT) Halt() {
dht.providers.Halt()
}
// NOTE: not yet finished, low priority
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, error) {
seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
for _, ps := range seq {
_, err := msg.FromObject(ps, pmes)
if err != nil {
log.Error("handleDiagnostics error creating message: %v\n", err)
continue
}
// dht.sender.SendRequest(context.TODO(), mes)
}
return nil, errors.New("not yet ported back")
// buf := new(bytes.Buffer)
// di := dht.getDiagInfo()
// buf.Write(di.Marshal())
//
// // NOTE: this shouldnt be a hardcoded value
// after := time.After(time.Second * 20)
// count := len(seq)
// for count > 0 {
// select {
// case <-after:
// //Timeout, return what we have
// goto out
// case reqResp := <-listenChan:
// pmesOut := new(Message)
// err := proto.Unmarshal(reqResp.Data, pmesOut)
// if err != nil {
// // It broke? eh, whatever, keep going
// continue
// }
// buf.Write(reqResp.Data)
// count--
// }
// }
//
// out:
// resp := Message{
// Type: Message_DIAGNOSTIC,
// ID: pmes.GetId(),
// Value: buf.Bytes(),
// Response: true,
// }
//
// mes := swarm.NewMessage(p, resp.ToProtobuf())
// dht.netChan.Outgoing <- mes
}
// Code generated by protoc-gen-gogo.
// Code generated by protoc-gen-go.
// source: messages.proto
// DO NOT EDIT!
......@@ -13,13 +13,11 @@ It has these top-level messages:
*/
package dht
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
import json "encoding/json"
import proto "code.google.com/p/goprotobuf/proto"
import math "math"
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf
type Message_MessageType int32
......@@ -31,7 +29,6 @@ const (
Message_GET_PROVIDERS Message_MessageType = 3
Message_FIND_NODE Message_MessageType = 4
Message_PING Message_MessageType = 5
Message_DIAGNOSTIC Message_MessageType = 6
)
var Message_MessageType_name = map[int32]string{
......@@ -41,7 +38,6 @@ var Message_MessageType_name = map[int32]string{
3: "GET_PROVIDERS",
4: "FIND_NODE",
5: "PING",
6: "DIAGNOSTIC",
}
var Message_MessageType_value = map[string]int32{
"PUT_VALUE": 0,
......@@ -50,7 +46,6 @@ var Message_MessageType_value = map[string]int32{
"GET_PROVIDERS": 3,
"FIND_NODE": 4,
"PING": 5,
"DIAGNOSTIC": 6,
}
func (x Message_MessageType) Enum() *Message_MessageType {
......@@ -72,7 +67,7 @@ func (x *Message_MessageType) UnmarshalJSON(data []byte) error {
type Message struct {
// defines what type of message it is.
Type *Message_MessageType `protobuf:"varint,1,req,name=type,enum=dht.Message_MessageType" json:"type,omitempty"`
Type *Message_MessageType `protobuf:"varint,1,opt,name=type,enum=dht.Message_MessageType" json:"type,omitempty"`
// defines what coral cluster level this query/response belongs to.
ClusterLevelRaw *int32 `protobuf:"varint,10,opt,name=clusterLevelRaw" json:"clusterLevelRaw,omitempty"`
// Used to specify the key associated with this message.
......@@ -137,8 +132,8 @@ func (m *Message) GetProviderPeers() []*Message_Peer {
}
type Message_Peer struct {
Id *string `protobuf:"bytes,1,req,name=id" json:"id,omitempty"`
Addr *string `protobuf:"bytes,2,req,name=addr" json:"addr,omitempty"`
Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
Addr *string `protobuf:"bytes,2,opt,name=addr" json:"addr,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
......
......@@ -10,16 +10,15 @@ message Message {
GET_PROVIDERS = 3;
FIND_NODE = 4;
PING = 5;
DIAGNOSTIC = 6;
}
message Peer {
required string id = 1;
required string addr = 2;
optional string id = 1;
optional string addr = 2;
}
// defines what type of message it is.
required MessageType type = 1;
optional MessageType type = 1;
// defines what coral cluster level this query/response belongs to.
optional int32 clusterLevelRaw = 10;
......
package dht
import (
"bytes"
"encoding/json"
"sync"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
......@@ -62,6 +60,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
routeLevel := 0
closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize)
if closest == nil || len(closest) == 0 {
log.Warning("Got no peers back from routing table!")
return nil, nil
}
......@@ -282,33 +281,3 @@ func (dht *IpfsDHT) Ping(ctx context.Context, p *peer.Peer) error {
log.Info("ping %s end (err = %s)", p, err)
return err
}
func (dht *IpfsDHT) getDiagnostic(ctx context.Context) ([]*diagInfo, error) {
log.Info("Begin Diagnostic")
peers := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
var out []*diagInfo
query := newQuery(dht.self.Key(), func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
pmes := newMessage(Message_DIAGNOSTIC, "", 0)
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
return nil, err
}
dec := json.NewDecoder(bytes.NewBuffer(rpmes.GetValue()))
for {
di := new(diagInfo)
err := dec.Decode(di)
if err != nil {
break
}
out = append(out, di)
}
return &dhtQueryResult{success: true}, nil
})
_, err := query.Run(ctx, peers)
return out, err
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment