Commit 29457214 authored by Brian Tiger Chow's avatar Brian Tiger Chow

refactor(dht/pb) move proto to pb package

parent 2ce41870
......@@ -11,6 +11,7 @@ import (
inet "github.com/jbenet/go-ipfs/net"
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util"
......@@ -128,7 +129,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
}
// deserialize msg
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mData, pmes)
if err != nil {
log.Error("Error unmarshaling data")
......@@ -140,7 +141,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
// Print out diagnostic
log.Debugf("%s got message type: '%s' from %s",
dht.self, Message_MessageType_name[int32(pmes.GetType())], mPeer)
dht.self, pb.Message_MessageType_name[int32(pmes.GetType())], mPeer)
// get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType())
......@@ -174,7 +175,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
mes, err := msg.FromObject(p, pmes)
if err != nil {
......@@ -185,7 +186,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)
// Print out diagnostic
log.Debugf("Sent message type: '%s' to %s",
Message_MessageType_name[int32(pmes.GetType())], p)
pb.Message_MessageType_name[int32(pmes.GetType())], p)
rmes, err := dht.sender.SendRequest(ctx, mes)
if err != nil {
......@@ -198,7 +199,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)
rtt := time.Since(start)
rmes.Peer().SetLatency(rtt)
rpmes := new(Message)
rpmes := new(pb.Message)
if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
return nil, err
}
......@@ -210,7 +211,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
key string, value []byte) error {
pmes := newMessage(Message_PUT_VALUE, string(key), 0)
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
pmes.Value = value
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
......@@ -225,10 +226,10 @@ func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error {
pmes := newMessage(Message_ADD_PROVIDER, string(key), 0)
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)
// add self as the provider
pmes.ProviderPeers = peersToPBPeers([]peer.Peer{dht.self})
pmes.ProviderPeers = pb.PeersToPBPeers([]peer.Peer{dht.self})
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
......@@ -290,9 +291,9 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
key u.Key, level int) (*Message, error) {
key u.Key, level int) (*pb.Message, error) {
pmes := newMessage(Message_GET_VALUE, string(key), level)
pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), level)
return dht.sendRequest(ctx, p, pmes)
}
......@@ -301,7 +302,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
peerlist []*Message_Peer, level int) ([]byte, error) {
peerlist []*pb.Message_Peer, level int) ([]byte, error) {
for _, pinfo := range peerlist {
p, err := dht.ensureConnectedToPeer(pinfo)
......@@ -379,17 +380,17 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
return nil, nil
}
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*Message, error) {
pmes := newMessage(Message_FIND_NODE, string(id), level)
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), level)
return dht.sendRequest(ctx, p, pmes)
}
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*Message, error) {
pmes := newMessage(Message_GET_PROVIDERS, string(key), level)
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), level)
return dht.sendRequest(ctx, p, pmes)
}
func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer {
func (dht *IpfsDHT) addProviders(key u.Key, peers []*pb.Message_Peer) []peer.Peer {
var provArr []peer.Peer
for _, prov := range peers {
p, err := dht.peerFromInfo(prov)
......@@ -413,7 +414,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer {
}
// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer {
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
level := pmes.GetClusterLevel()
cluster := dht.routingTables[level]
......@@ -423,7 +424,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer {
}
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []peer.Peer {
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
closer := dht.nearestPeersToQuery(pmes, count)
// no node? nil
......@@ -462,7 +463,7 @@ func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
return p, nil
}
func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) {
func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) {
id := peer.ID(pbp.GetId())
......@@ -485,7 +486,7 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) {
return p, nil
}
func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (peer.Peer, error) {
func (dht *IpfsDHT) ensureConnectedToPeer(pbp *pb.Message_Peer) (peer.Peer, error) {
p, err := dht.peerFromInfo(pbp)
if err != nil {
return nil, err
......
......@@ -12,6 +12,7 @@ import (
msg "github.com/jbenet/go-ipfs/net/message"
mux "github.com/jbenet/go-ipfs/net/mux"
peer "github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
u "github.com/jbenet/go-ipfs/util"
"time"
......@@ -127,13 +128,13 @@ func TestGetFailures(t *testing.T) {
// u.POut("NotFound Test\n")
// Reply with failures to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
}
resp := &Message{
resp := &pb.Message{
Type: pmes.Type,
}
m, err := msg.FromObject(mes.Peer(), resp)
......@@ -153,9 +154,9 @@ func TestGetFailures(t *testing.T) {
fs.handlers = nil
// Now we test this DHT's handleGetValue failure
typ := Message_GET_VALUE
typ := pb.Message_GET_VALUE
str := "hello"
req := Message{
req := pb.Message{
Type: &typ,
Key: &str,
Value: []byte{0},
......@@ -169,7 +170,7 @@ func TestGetFailures(t *testing.T) {
mes = d.HandleMessage(ctx, mes)
pmes := new(Message)
pmes := new(pb.Message)
err = proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
......@@ -215,21 +216,21 @@ func TestNotFound(t *testing.T) {
// Reply with random peers to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
}
switch pmes.GetType() {
case Message_GET_VALUE:
resp := &Message{Type: pmes.Type}
case pb.Message_GET_VALUE:
resp := &pb.Message{Type: pmes.Type}
peers := []peer.Peer{}
for i := 0; i < 7; i++ {
peers = append(peers, _randPeer())
}
resp.CloserPeers = peersToPBPeers(peers)
resp.CloserPeers = pb.PeersToPBPeers(peers)
mes, err := msg.FromObject(mes.Peer(), resp)
if err != nil {
t.Error(err)
......@@ -282,17 +283,17 @@ func TestLessThanKResponses(t *testing.T) {
// Reply with random peers to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
}
switch pmes.GetType() {
case Message_GET_VALUE:
resp := &Message{
case pb.Message_GET_VALUE:
resp := &pb.Message{
Type: pmes.Type,
CloserPeers: peersToPBPeers([]peer.Peer{other}),
CloserPeers: pb.PeersToPBPeers([]peer.Peer{other}),
}
mes, err := msg.FromObject(mes.Peer(), resp)
......
......@@ -6,6 +6,7 @@ import (
"time"
peer "github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
u "github.com/jbenet/go-ipfs/util"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
......@@ -14,32 +15,32 @@ import (
var CloserPeerCount = 4
// dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(peer.Peer, *Message) (*Message, error)
type dhtHandler func(peer.Peer, *pb.Message) (*pb.Message, error)
func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
switch t {
case Message_GET_VALUE:
case pb.Message_GET_VALUE:
return dht.handleGetValue
case Message_PUT_VALUE:
case pb.Message_PUT_VALUE:
return dht.handlePutValue
case Message_FIND_NODE:
case pb.Message_FIND_NODE:
return dht.handleFindPeer
case Message_ADD_PROVIDER:
case pb.Message_ADD_PROVIDER:
return dht.handleAddProvider
case Message_GET_PROVIDERS:
case pb.Message_GET_PROVIDERS:
return dht.handleGetProviders
case Message_PING:
case pb.Message_PING:
return dht.handlePing
default:
return nil
}
}
func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey())
// setup response
resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
// first, is the key even a key?
key := pmes.GetKey()
......@@ -77,7 +78,7 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *Message) (*Message, error)
provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
if len(provs) > 0 {
log.Debugf("handleGetValue returning %d provider[s]", len(provs))
resp.ProviderPeers = peersToPBPeers(provs)
resp.ProviderPeers = pb.PeersToPBPeers(provs)
}
// Find closest peer on given cluster to desired key and reply with that info
......@@ -89,14 +90,14 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *Message) (*Message, error)
log.Critical("no addresses on peer being sent!")
}
}
resp.CloserPeers = peersToPBPeers(closer)
resp.CloserPeers = pb.PeersToPBPeers(closer)
}
return resp, nil
}
// Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
dht.dslock.Lock()
defer dht.dslock.Unlock()
dskey := u.Key(pmes.GetKey()).DsKey()
......@@ -105,13 +106,13 @@ func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *Message) (*Message, error)
return pmes, err
}
func (dht *IpfsDHT) handlePing(p peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handlePing(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
log.Debugf("%s Responding to ping from %s!\n", dht.self, p)
return pmes, nil
}
func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *Message) (*Message, error) {
resp := newMessage(pmes.GetType(), "", pmes.GetClusterLevel())
func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
var closest []peer.Peer
// if looking for self... special case where we send it on CloserPeers.
......@@ -136,12 +137,12 @@ func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *Message) (*Message, error)
for _, p := range withAddresses {
log.Debugf("handleFindPeer: sending back '%s'", p)
}
resp.CloserPeers = peersToPBPeers(withAddresses)
resp.CloserPeers = pb.PeersToPBPeers(withAddresses)
return resp, nil
}
func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *Message) (*Message, error) {
resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
// check if we have this value, to add ourselves as provider.
log.Debugf("handling GetProviders: '%s'", pmes.GetKey())
......@@ -160,13 +161,13 @@ func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *Message) (*Message, er
// if we've got providers, send thos those.
if providers != nil && len(providers) > 0 {
resp.ProviderPeers = peersToPBPeers(providers)
resp.ProviderPeers = pb.PeersToPBPeers(providers)
}
// Also send closer peers.
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
if closer != nil {
resp.CloserPeers = peersToPBPeers(closer)
resp.CloserPeers = pb.PeersToPBPeers(closer)
}
return resp, nil
......@@ -177,7 +178,7 @@ type providerInfo struct {
Value peer.Peer
}
func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
key := u.Key(pmes.GetKey())
log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key))
......
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --gogo_out=. --proto_path=../../../../:/usr/local/opt/protobuf/include:. $<
protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. $<
clean:
rm *.pb.go
rm -f *.pb.go
rm -f *.go
// Code generated by protoc-gen-go.
// source: messages.proto
// Code generated by protoc-gen-gogo.
// source: dht.proto
// DO NOT EDIT!
/*
Package dht is a generated protocol buffer package.
Package dht_pb is a generated protocol buffer package.
It is generated from these files:
messages.proto
dht.proto
It has these top-level messages:
Message
*/
package dht
package dht_pb
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
......@@ -67,7 +67,7 @@ func (x *Message_MessageType) UnmarshalJSON(data []byte) error {
type Message struct {
// defines what type of message it is.
Type *Message_MessageType `protobuf:"varint,1,opt,name=type,enum=dht.Message_MessageType" json:"type,omitempty"`
Type *Message_MessageType `protobuf:"varint,1,opt,name=type,enum=dht.pb.Message_MessageType" json:"type,omitempty"`
// defines what coral cluster level this query/response belongs to.
ClusterLevelRaw *int32 `protobuf:"varint,10,opt,name=clusterLevelRaw" json:"clusterLevelRaw,omitempty"`
// Used to specify the key associated with this message.
......@@ -156,5 +156,5 @@ func (m *Message_Peer) GetAddr() string {
}
func init() {
proto.RegisterEnum("dht.Message_MessageType", Message_MessageType_name, Message_MessageType_value)
proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value)
}
package dht;
package dht.pb;
//run `protoc --go_out=. *.proto` to generate
......
package dht
package dht_pb
import (
"errors"
......@@ -8,7 +8,7 @@ import (
peer "github.com/jbenet/go-ipfs/peer"
)
func newMessage(typ Message_MessageType, key string, level int) *Message {
func NewMessage(typ Message_MessageType, key string, level int) *Message {
m := &Message{
Type: &typ,
Key: &key,
......@@ -31,7 +31,7 @@ func peerToPBPeer(p peer.Peer) *Message_Peer {
return pbp
}
func peersToPBPeers(peers []peer.Peer) []*Message_Peer {
func PeersToPBPeers(peers []peer.Peer) []*Message_Peer {
pbpeers := make([]*Message_Peer, len(peers))
for i, p := range peers {
pbpeers[i] = peerToPBPeer(p)
......@@ -53,8 +53,7 @@ func (m *Message_Peer) Address() (ma.Multiaddr, error) {
func (m *Message) GetClusterLevel() int {
level := m.GetClusterLevelRaw() - 1
if level < 0 {
log.Debug("GetClusterLevel: no routing level specified, assuming 0")
level = 0
return 0
}
return int(level)
}
......
......@@ -6,6 +6,7 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
peer "github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util"
)
......@@ -152,10 +153,10 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
return peerOut
}
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
done := make(chan struct{})
for _, pbp := range peers {
go func(mp *Message_Peer) {
go func(mp *pb.Message_Peer) {
defer func() { done <- struct{}{} }()
// construct new peer
p, err := dht.ensureConnectedToPeer(mp)
......@@ -258,7 +259,7 @@ func (dht *IpfsDHT) Ping(ctx context.Context, p peer.Peer) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
log.Infof("ping %s start", p)
pmes := newMessage(Message_PING, "", 0)
pmes := pb.NewMessage(pb.Message_PING, "", 0)
_, err := dht.sendRequest(ctx, p, pmes)
log.Infof("ping %s end (err = %s)", p, err)
return err
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment