Commit 05158e5b authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Integrated new network into ipfs

parent 88a9bf04
......@@ -11,19 +11,17 @@ import (
"time"
inet "github.com/jbenet/go-ipfs/net"
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
"github.com/jbenet/go-ipfs/util/eventlog"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
)
var log = eventlog.Logger("dht")
......@@ -35,50 +33,37 @@ const doPinging = false
// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
// Array of routing tables for differently distanced nodes
// NOTE: (currently, only a single table is used)
routingTable *kb.RoutingTable
// the network services we need
dialer inet.Dialer
sender inet.Sender
network inet.Network // the network services we need
self peer.Peer // Local peer (yourself)
peerstore peer.Peerstore // Other peers
// Local peer (yourself)
self peer.Peer
// Other peers
peerstore peer.Peerstore
// Local data
datastore ds.Datastore
datastore ds.Datastore // Local data
dslock sync.Mutex
providers *ProviderManager
// When this peer started up
birth time.Time
routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
providers *ProviderManager
//lock to make diagnostics work better
diaglock sync.Mutex
birth time.Time // When this peer started up
diaglock sync.Mutex // lock to make diagnostics work better
// record validator funcs
Validators map[string]ValidatorFunc
ctxc.ContextCloser
ctxgroup.ContextGroup
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dialer, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, n inet.Network, dstore ds.Datastore) *IpfsDHT {
dht := new(IpfsDHT)
dht.dialer = dialer
dht.sender = sender
dht.datastore = dstore
dht.self = p
dht.peerstore = ps
dht.ContextCloser = ctxc.NewContextCloser(ctx, nil)
dht.ContextGroup = ctxgroup.WithContext(ctx)
dht.network = n
n.SetHandler(inet.ProtocolDHT, dht.handleNewStream)
dht.providers = NewProviderManager(dht.Context(), p.ID())
dht.AddCloserChild(dht.providers)
dht.AddChildGroup(dht.providers)
dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Minute)
dht.birth = time.Now()
......@@ -95,7 +80,7 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia
// Connect to a new peer at the given address, ping and add to the routing table
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) error {
err := dht.dialer.DialPeer(ctx, npeer)
err := dht.network.DialPeer(ctx, npeer)
if err != nil {
return err
}
......@@ -113,93 +98,6 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) error {
return nil
}
// HandleMessage implements the inet.Handler interface.
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.NetMessage {
mData := mes.Data()
if mData == nil {
log.Error("Message contained nil data.")
return nil
}
mPeer := mes.Peer()
if mPeer == nil {
log.Error("Message contained nil peer.")
return nil
}
// deserialize msg
pmes := new(pb.Message)
err := proto.Unmarshal(mData, pmes)
if err != nil {
log.Error("Error unmarshaling data")
return nil
}
// update the peer (on valid msgs only)
dht.Update(ctx, mPeer)
log.Event(ctx, "foo", dht.self, mPeer, pmes)
// get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType())
if handler == nil {
log.Error("got back nil handler from handlerForMsgType")
return nil
}
// dispatch handler.
rpmes, err := handler(ctx, mPeer, pmes)
if err != nil {
log.Errorf("handle message error: %s", err)
return nil
}
// if nil response, return it before serializing
if rpmes == nil {
log.Warning("Got back nil response from request.")
return nil
}
// serialize response msg
rmes, err := msg.FromObject(mPeer, rpmes)
if err != nil {
log.Errorf("serialze response error: %s", err)
return nil
}
return rmes
}
// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
mes, err := msg.FromObject(p, pmes)
if err != nil {
return nil, err
}
start := time.Now()
rmes, err := dht.sender.SendRequest(ctx, mes) // respect?
if err != nil {
return nil, err
}
if rmes == nil {
return nil, errors.New("no response to request")
}
log.Event(ctx, "sentMessage", dht.self, p, pmes)
rmes.Peer().SetLatency(time.Since(start))
rpmes := new(pb.Message)
if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
return nil, err
}
return rpmes, nil
}
// putValueToNetwork stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
key string, rec *pb.Record) error {
......@@ -224,7 +122,7 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) er
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)
// add self as the provider
pmes.ProviderPeers = pb.PeersToPBPeers(dht.dialer, []peer.Peer{dht.self})
pmes.ProviderPeers = pb.PeersToPBPeers(dht.network, []peer.Peer{dht.self})
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
......@@ -478,12 +376,12 @@ func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, pbp *pb.Message_P
return nil, err
}
if dht.dialer.LocalPeer().ID().Equal(p.ID()) {
if dht.self.ID().Equal(p.ID()) {
return nil, errors.New("attempting to ensure connection to self")
}
// dial connection
err = dht.dialer.DialPeer(ctx, p)
err = dht.network.DialPeer(ctx, p)
return p, err
}
......@@ -536,7 +434,7 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) {
if err != nil {
log.Errorf("Bootstrap peer error: %s", err)
}
err = dht.dialer.DialPeer(ctx, p)
err = dht.network.DialPeer(ctx, p)
if err != nil {
log.Errorf("Bootstrap peer error: %s", err)
}
......
......@@ -12,8 +12,6 @@ import (
ci "github.com/jbenet/go-ipfs/crypto"
inet "github.com/jbenet/go-ipfs/net"
mux "github.com/jbenet/go-ipfs/net/mux"
netservice "github.com/jbenet/go-ipfs/net/service"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
testutil "github.com/jbenet/go-ipfs/util/testutil"
......@@ -25,16 +23,14 @@ import (
func setupDHT(ctx context.Context, t *testing.T, p peer.Peer) *IpfsDHT {
peerstore := peer.NewPeerstore()
dhts := netservice.NewService(ctx, nil) // nil handler for now, need to patch it
net, err := inet.NewIpfsNetwork(ctx, p.Addresses(), p, peerstore, &mux.ProtocolMap{
mux.ProtocolID_Routing: dhts,
})
n, err := inet.NewNetwork(ctx, p.Addresses(), p, peerstore)
if err != nil {
t.Fatal(err)
}
d := NewDHT(ctx, p, peerstore, net, dhts, ds.NewMapDatastore())
dhts.SetHandler(d)
d := NewDHT(ctx, p, peerstore, n, ds.NewMapDatastore())
d.network.SetHandler(inet.ProtocolDHT, d.handleNewStream)
d.Validators["v"] = func(u.Key, []byte) error {
return nil
}
......@@ -107,8 +103,8 @@ func TestPing(t *testing.T) {
defer dhtA.Close()
defer dhtB.Close()
defer dhtA.dialer.(inet.Network).Close()
defer dhtB.dialer.(inet.Network).Close()
defer dhtA.network.Close()
defer dhtB.network.Close()
err = dhtA.Connect(ctx, peerB)
if err != nil {
......@@ -157,8 +153,8 @@ func TestValueGetSet(t *testing.T) {
defer dhtA.Close()
defer dhtB.Close()
defer dhtA.dialer.(inet.Network).Close()
defer dhtB.dialer.(inet.Network).Close()
defer dhtA.network.Close()
defer dhtB.network.Close()
err = dhtA.Connect(ctx, peerB)
if err != nil {
......@@ -199,7 +195,7 @@ func TestProvides(t *testing.T) {
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
defer dhts[i].dialer.(inet.Network).Close()
defer dhts[i].network.Close()
}
}()
......@@ -261,7 +257,7 @@ func TestProvidesAsync(t *testing.T) {
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
defer dhts[i].dialer.(inet.Network).Close()
defer dhts[i].network.Close()
}
}()
......@@ -326,7 +322,7 @@ func TestLayeredGet(t *testing.T) {
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
defer dhts[i].dialer.(inet.Network).Close()
defer dhts[i].network.Close()
}
}()
......@@ -381,7 +377,7 @@ func TestFindPeer(t *testing.T) {
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
dhts[i].dialer.(inet.Network).Close()
dhts[i].network.Close()
}
}()
......@@ -427,7 +423,7 @@ func TestFindPeersConnectedToPeer(t *testing.T) {
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
dhts[i].dialer.(inet.Network).Close()
dhts[i].network.Close()
}
}()
......@@ -566,8 +562,8 @@ func TestConnectCollision(t *testing.T) {
dhtA.Close()
dhtB.Close()
dhtA.dialer.(inet.Network).Close()
dhtB.dialer.(inet.Network).Close()
dhtA.network.Close()
dhtB.network.Close()
<-time.After(200 * time.Millisecond)
}
......
......@@ -9,8 +9,6 @@ import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
inet "github.com/jbenet/go-ipfs/net"
msg "github.com/jbenet/go-ipfs/net/message"
mux "github.com/jbenet/go-ipfs/net/mux"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
......
......@@ -87,7 +87,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.Peer, pmes *pb.Me
provs := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey()))
if len(provs) > 0 {
log.Debugf("handleGetValue returning %d provider[s]", len(provs))
resp.ProviderPeers = pb.PeersToPBPeers(dht.dialer, provs)
resp.ProviderPeers = pb.PeersToPBPeers(dht.network, provs)
}
// Find closest peer on given cluster to desired key and reply with that info
......@@ -99,7 +99,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.Peer, pmes *pb.Me
log.Critical("no addresses on peer being sent!")
}
}
resp.CloserPeers = pb.PeersToPBPeers(dht.dialer, closer)
resp.CloserPeers = pb.PeersToPBPeers(dht.network, closer)
}
return resp, nil
......@@ -160,7 +160,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.Peer, pmes *pb.Me
log.Debugf("handleFindPeer: sending back '%s'", p)
}
resp.CloserPeers = pb.PeersToPBPeers(dht.dialer, withAddresses)
resp.CloserPeers = pb.PeersToPBPeers(dht.network, withAddresses)
return resp, nil
}
......@@ -183,13 +183,13 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.Peer, pmes *p
}
if providers != nil && len(providers) > 0 {
resp.ProviderPeers = pb.PeersToPBPeers(dht.dialer, providers)
resp.ProviderPeers = pb.PeersToPBPeers(dht.network, providers)
}
// Also send closer peers.
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
if closer != nil {
resp.CloserPeers = pb.PeersToPBPeers(dht.dialer, closer)
resp.CloserPeers = pb.PeersToPBPeers(dht.network, closer)
}
return resp, nil
......
......@@ -65,7 +65,7 @@ func RawPeersToPBPeers(peers []peer.Peer) []*Message_Peer {
// which can be written to a message and sent out. the key thing this function
// does (in addition to PeersToPBPeers) is set the ConnectionType with
// information from the given inet.Dialer.
func PeersToPBPeers(d inet.Dialer, peers []peer.Peer) []*Message_Peer {
func PeersToPBPeers(d inet.Network, peers []peer.Peer) []*Message_Peer {
pbps := RawPeersToPBPeers(peers)
for i, pbp := range pbps {
c := ConnectionType(d.Connectedness(peers[i]))
......
......@@ -3,9 +3,9 @@ package dht
import (
"time"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
......@@ -18,7 +18,7 @@ type ProviderManager struct {
newprovs chan *addProv
getprovs chan *getProv
period time.Duration
ctxc.ContextCloser
ctxgroup.ContextGroup
}
type addProv struct {
......@@ -38,7 +38,7 @@ func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
pm.providers = make(map[u.Key][]*providerInfo)
pm.getlocal = make(chan chan []u.Key)
pm.local = make(map[u.Key]struct{})
pm.ContextCloser = ctxc.NewContextCloser(ctx, nil)
pm.ContextGroup = ctxgroup.WithContext(ctx)
pm.Children().Add(1)
go pm.run()
......
......@@ -50,7 +50,7 @@ func (dht *IpfsDHT) getPublicKey(pid peer.ID) (ci.PubKey, error) {
}
log.Debug("not in peerstore, searching dht.")
ctxT, _ := context.WithTimeout(dht.ContextCloser.Context(), time.Second*5)
ctxT, _ := context.WithTimeout(dht.ContextGroup.Context(), time.Second*5)
val, err := dht.GetValue(ctxT, u.Key("/pk/"+string(pid)))
if err != nil {
log.Warning("Failed to find requested public key.")
......
......@@ -40,7 +40,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
log.Debugf("%s PutValue qry part %v", dht.self, p)
err := dht.putValueToNetwork(ctx, p, string(key), rec)
if err != nil {
......@@ -75,7 +75,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
}
// setup the Query
query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
val, peers, err := dht.getValueOrPeers(ctx, p, key)
if err != nil {
......@@ -159,7 +159,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
}
// setup the Query
query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
pmes, err := dht.findProvidersSingle(ctx, p, key)
if err != nil {
......@@ -262,7 +262,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error)
}
// setup the Query
query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
pmes, err := dht.findPeerSingle(ctx, p, id)
if err != nil {
......@@ -316,7 +316,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
}
// setup the Query
query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
pmes, err := dht.findPeerSingle(ctx, p, id)
if err != nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment