Commit 330c8e84 authored by Steven Allen's avatar Steven Allen

remove supernode routing

It was never fully implemented and isn't used.

fixes #3950

(not removing routing/mock because that *is* in use).

License: MIT
Signed-off-by: default avatarSteven Allen <steven@stebalien.com>
parent 75382869
package supernode
import (
"bytes"
"context"
"errors"
"time"
proxy "github.com/ipfs/go-ipfs/routing/supernode/proxy"
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
loggables "gx/ipfs/QmT4PgCNdv73hnFAqzHqwW44q7M9PWpykSswHDxndquZbc/go-libp2p-loggables"
dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
"gx/ipfs/QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6/go-libp2p-host"
pb "gx/ipfs/QmbxkgUceEcuSZ4ZdBA3x74VUDSSYjHYmmeEqkjxbtZ6Jg/go-libp2p-record/pb"
)
var log = logging.Logger("supernode")
type Client struct {
peerhost host.Host
peerstore pstore.Peerstore
proxy proxy.Proxy
local peer.ID
}
// TODO take in datastore/cache
func NewClient(px proxy.Proxy, h host.Host, ps pstore.Peerstore, local peer.ID) (*Client, error) {
return &Client{
proxy: px,
local: local,
peerstore: ps,
peerhost: h,
}, nil
}
func (c *Client) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan pstore.PeerInfo {
logging.ContextWithLoggable(ctx, loggables.Uuid("findProviders"))
defer log.EventBegin(ctx, "findProviders", k).Done()
ch := make(chan pstore.PeerInfo)
go func() {
defer close(ch)
request := dhtpb.NewMessage(dhtpb.Message_GET_PROVIDERS, k.KeyString(), 0)
response, err := c.proxy.SendRequest(ctx, request)
if err != nil {
log.Debug(err)
return
}
for _, p := range dhtpb.PBPeersToPeerInfos(response.GetProviderPeers()) {
select {
case <-ctx.Done():
log.Debug(ctx.Err())
return
case ch <- *p:
}
}
}()
return ch
}
func (c *Client) PutValue(ctx context.Context, k string, v []byte) error {
defer log.EventBegin(ctx, "putValue").Done()
r, err := makeRecord(c.peerstore, c.local, k, v)
if err != nil {
return err
}
pmes := dhtpb.NewMessage(dhtpb.Message_PUT_VALUE, string(k), 0)
pmes.Record = r
return c.proxy.SendMessage(ctx, pmes) // wrap to hide the remote
}
func (c *Client) GetValue(ctx context.Context, k string) ([]byte, error) {
defer log.EventBegin(ctx, "getValue").Done()
msg := dhtpb.NewMessage(dhtpb.Message_GET_VALUE, string(k), 0)
response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
if err != nil {
return nil, err
}
return response.Record.GetValue(), nil
}
func (c *Client) GetValues(ctx context.Context, k string, _ int) ([]routing.RecvdVal, error) {
defer log.EventBegin(ctx, "getValue").Done()
msg := dhtpb.NewMessage(dhtpb.Message_GET_VALUE, string(k), 0)
response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
if err != nil {
return nil, err
}
return []routing.RecvdVal{
{
Val: response.Record.GetValue(),
From: c.local,
},
}, nil
}
// Provide adds the given key 'k' to the content routing system. If 'brd' is
// true, it announces that content to the network. For the supernode client,
// setting 'brd' to false makes this call a no-op
func (c *Client) Provide(ctx context.Context, k *cid.Cid, brd bool) error {
if !brd {
return nil
}
defer log.EventBegin(ctx, "provide", k).Done()
msg := dhtpb.NewMessage(dhtpb.Message_ADD_PROVIDER, k.KeyString(), 0)
// FIXME how is connectedness defined for the local node
pri := []dhtpb.PeerRoutingInfo{
{
PeerInfo: pstore.PeerInfo{
ID: c.local,
Addrs: c.peerhost.Addrs(),
},
},
}
msg.ProviderPeers = dhtpb.PeerRoutingInfosToPBPeers(pri)
return c.proxy.SendMessage(ctx, msg) // TODO wrap to hide remote
}
func (c *Client) FindPeer(ctx context.Context, id peer.ID) (pstore.PeerInfo, error) {
defer log.EventBegin(ctx, "findPeer", id).Done()
request := dhtpb.NewMessage(dhtpb.Message_FIND_NODE, string(id), 0)
response, err := c.proxy.SendRequest(ctx, request) // hide remote
if err != nil {
return pstore.PeerInfo{}, err
}
for _, p := range dhtpb.PBPeersToPeerInfos(response.GetCloserPeers()) {
if p.ID == id {
return *p, nil
}
}
return pstore.PeerInfo{}, errors.New("could not find peer")
}
// creates and signs a record for the given key/value pair
func makeRecord(ps pstore.Peerstore, p peer.ID, k string, v []byte) (*pb.Record, error) {
blob := bytes.Join([][]byte{[]byte(k), v, []byte(p)}, []byte{})
sig, err := ps.PrivKey(p).Sign(blob)
if err != nil {
return nil, err
}
return &pb.Record{
Key: proto.String(string(k)),
Value: v,
Author: proto.String(string(p)),
Signature: sig,
}, nil
}
func (c *Client) Ping(ctx context.Context, id peer.ID) (time.Duration, error) {
defer log.EventBegin(ctx, "ping", id).Done()
return time.Nanosecond, errors.New("supernode routing does not support the ping method")
}
func (c *Client) Bootstrap(ctx context.Context) error {
return c.proxy.Bootstrap(ctx)
}
var _ routing.IpfsRouting = &Client{}
package proxy
import (
context "context"
inet "gx/ipfs/QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1/go-libp2p-net"
dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
)
// RequestHandler handles routing requests locally
type RequestHandler interface {
HandleRequest(ctx context.Context, p peer.ID, m *dhtpb.Message) *dhtpb.Message
}
// Loopback forwards requests to a local handler
type Loopback struct {
Handler RequestHandler
Local peer.ID
}
func (_ *Loopback) Bootstrap(ctx context.Context) error {
return nil
}
// SendMessage intercepts local requests, forwarding them to a local handler
func (lb *Loopback) SendMessage(ctx context.Context, m *dhtpb.Message) error {
response := lb.Handler.HandleRequest(ctx, lb.Local, m)
if response != nil {
log.Warning("loopback handler returned unexpected message")
}
return nil
}
// SendRequest intercepts local requests, forwarding them to a local handler
func (lb *Loopback) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error) {
return lb.Handler.HandleRequest(ctx, lb.Local, m), nil
}
func (lb *Loopback) HandleStream(s inet.Stream) {
defer s.Close()
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
var incoming dhtpb.Message
if err := pbr.ReadMsg(&incoming); err != nil {
s.Reset()
log.Debug(err)
return
}
ctx := context.TODO()
outgoing := lb.Handler.HandleRequest(ctx, s.Conn().RemotePeer(), &incoming)
pbw := ggio.NewDelimitedWriter(s)
if err := pbw.WriteMsg(outgoing); err != nil {
s.Reset()
log.Debug(err)
return
}
}
package proxy
import (
"context"
"errors"
inet "gx/ipfs/QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1/go-libp2p-net"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
kbucket "gx/ipfs/QmSAFA8v42u4gpJNy1tb7vW3JiiXiaYDC2b845c2RnNSJL/go-libp2p-kbucket"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
loggables "gx/ipfs/QmT4PgCNdv73hnFAqzHqwW44q7M9PWpykSswHDxndquZbc/go-libp2p-loggables"
dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
host "gx/ipfs/QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6/go-libp2p-host"
)
const ProtocolSNR = "/ipfs/supernoderouting"
var log = logging.Logger("supernode/proxy")
type Proxy interface {
Bootstrap(context.Context) error
HandleStream(inet.Stream)
SendMessage(ctx context.Context, m *dhtpb.Message) error
SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error)
}
type standard struct {
Host host.Host
remoteInfos []pstore.PeerInfo // addr required for bootstrapping
remoteIDs []peer.ID // []ID is required for each req. here, cached for performance.
}
func Standard(h host.Host, remotes []pstore.PeerInfo) Proxy {
var ids []peer.ID
for _, remote := range remotes {
ids = append(ids, remote.ID)
}
return &standard{h, remotes, ids}
}
func (px *standard) Bootstrap(ctx context.Context) error {
var cxns []pstore.PeerInfo
for _, info := range px.remoteInfos {
if err := px.Host.Connect(ctx, info); err != nil {
continue
}
cxns = append(cxns, info)
}
if len(cxns) == 0 {
log.Error("unable to bootstrap to any supernode routers")
} else {
log.Infof("bootstrapped to %d supernode routers: %s", len(cxns), cxns)
}
return nil
}
func (p *standard) HandleStream(s inet.Stream) {
// TODO(brian): Should clients be able to satisfy requests?
log.Error("supernode client received (dropped) a routing message from", s.Conn().RemotePeer())
s.Reset()
}
const replicationFactor = 2
// SendMessage sends message to each remote sequentially (randomized order),
// stopping after the first successful response. If all fail, returns the last
// error.
func (px *standard) SendMessage(ctx context.Context, m *dhtpb.Message) error {
var err error
var numSuccesses int
for _, remote := range sortedByKey(px.remoteIDs, m.GetKey()) {
if err = px.sendMessage(ctx, m, remote); err != nil { // careful don't re-declare err!
continue
}
numSuccesses++
switch m.GetType() {
case dhtpb.Message_ADD_PROVIDER, dhtpb.Message_PUT_VALUE:
if numSuccesses < replicationFactor {
continue
}
}
return nil // success
}
return err // NB: returns the last error
}
func (px *standard) sendMessage(ctx context.Context, m *dhtpb.Message, remote peer.ID) (err error) {
e := log.EventBegin(ctx, "sendRoutingMessage", px.Host.ID(), remote, m)
defer func() {
if err != nil {
e.SetError(err)
}
e.Done()
}()
if err = px.Host.Connect(ctx, pstore.PeerInfo{ID: remote}); err != nil {
return err
}
s, err := px.Host.NewStream(ctx, remote, ProtocolSNR)
if err != nil {
return err
}
pbw := ggio.NewDelimitedWriter(s)
err = pbw.WriteMsg(m)
if err == nil {
s.Close()
} else {
s.Reset()
}
return err
}
// SendRequest sends the request to each remote sequentially (randomized order),
// stopping after the first successful response. If all fail, returns the last
// error.
func (px *standard) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error) {
var err error
for _, remote := range sortedByKey(px.remoteIDs, m.GetKey()) {
var reply *dhtpb.Message
reply, err = px.sendRequest(ctx, m, remote) // careful don't redeclare err!
if err != nil {
continue
}
return reply, nil // success
}
return nil, err // NB: returns the last error
}
func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote peer.ID) (*dhtpb.Message, error) {
e := log.EventBegin(ctx, "sendRoutingRequest", px.Host.ID(), remote, logging.Pair("request", m))
defer e.Done()
if err := px.Host.Connect(ctx, pstore.PeerInfo{ID: remote}); err != nil {
e.SetError(err)
return nil, err
}
s, err := px.Host.NewStream(ctx, remote, ProtocolSNR)
if err != nil {
e.SetError(err)
return nil, err
}
defer s.Close()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
if err = w.WriteMsg(m); err != nil {
s.Reset()
e.SetError(err)
return nil, err
}
response := &dhtpb.Message{}
if err = r.ReadMsg(response); err != nil {
s.Reset()
e.SetError(err)
return nil, err
}
// need ctx expiration?
if response == nil {
s.Reset()
err := errors.New("no response to request")
e.SetError(err)
return nil, err
}
e.Append(logging.Pair("response", response))
e.Append(logging.Pair("uuid", loggables.Uuid("foo")))
return response, nil
}
func sortedByKey(peers []peer.ID, skey string) []peer.ID {
target := kbucket.ConvertKey(skey)
return kbucket.SortClosestPeers(peers, target)
}
package supernode
import (
"context"
"errors"
"fmt"
proxy "github.com/ipfs/go-ipfs/routing/supernode/proxy"
dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb"
datastore "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
pb "gx/ipfs/QmbxkgUceEcuSZ4ZdBA3x74VUDSSYjHYmmeEqkjxbtZ6Jg/go-libp2p-record/pb"
)
// Server handles routing queries using a database backend
type Server struct {
local peer.ID
routingBackend datastore.Datastore
peerstore pstore.Peerstore
*proxy.Loopback // so server can be injected into client
}
// NewServer creates a new Supernode routing Server
func NewServer(ds datastore.Datastore, ps pstore.Peerstore, local peer.ID) (*Server, error) {
s := &Server{local, ds, ps, nil}
s.Loopback = &proxy.Loopback{
Handler: s,
Local: local,
}
return s, nil
}
func (_ *Server) Bootstrap(ctx context.Context) error {
return nil
}
// HandleLocalRequest implements the proxy.RequestHandler interface. This is
// where requests are received from the outside world.
func (s *Server) HandleRequest(ctx context.Context, p peer.ID, req *dhtpb.Message) *dhtpb.Message {
_, response := s.handleMessage(ctx, p, req) // ignore response peer. it's local.
return response
}
func (s *Server) handleMessage(
ctx context.Context, p peer.ID, req *dhtpb.Message) (peer.ID, *dhtpb.Message) {
defer log.EventBegin(ctx, "routingMessageReceived", req, p).Done()
var response = dhtpb.NewMessage(req.GetType(), req.GetKey(), req.GetClusterLevel())
switch req.GetType() {
case dhtpb.Message_GET_VALUE:
rawRecord, err := getRoutingRecord(s.routingBackend, req.GetKey())
if err != nil {
return "", nil
}
response.Record = rawRecord
return p, response
case dhtpb.Message_PUT_VALUE:
// FIXME: verify complains that the peer's ID is not present in the
// peerstore. Mocknet problem?
// if err := verify(s.peerstore, req.GetRecord()); err != nil {
// log.Event(ctx, "validationFailed", req, p)
// return "", nil
// }
putRoutingRecord(s.routingBackend, req.GetKey(), req.GetRecord())
return p, req
case dhtpb.Message_FIND_NODE:
p := s.peerstore.PeerInfo(peer.ID(req.GetKey()))
pri := []dhtpb.PeerRoutingInfo{
{
PeerInfo: p,
// Connectedness: TODO
},
}
response.CloserPeers = dhtpb.PeerRoutingInfosToPBPeers(pri)
return p.ID, response
case dhtpb.Message_ADD_PROVIDER:
for _, provider := range req.GetProviderPeers() {
providerID := peer.ID(provider.GetId())
if providerID == p {
store := []*dhtpb.Message_Peer{provider}
storeProvidersToPeerstore(s.peerstore, p, store)
if err := putRoutingProviders(s.routingBackend, req.GetKey(), store); err != nil {
return "", nil
}
} else {
log.Event(ctx, "addProviderBadRequest", p, req)
}
}
return "", nil
case dhtpb.Message_GET_PROVIDERS:
providers, err := getRoutingProviders(s.routingBackend, req.GetKey())
if err != nil {
return "", nil
}
response.ProviderPeers = providers
return p, response
case dhtpb.Message_PING:
return p, req
default:
}
return "", nil
}
var _ proxy.RequestHandler = &Server{}
var _ proxy.Proxy = &Server{}
func getRoutingRecord(ds datastore.Datastore, k string) (*pb.Record, error) {
dskey := dshelp.NewKeyFromBinary([]byte(k))
val, err := ds.Get(dskey)
if err != nil {
return nil, err
}
recordBytes, ok := val.([]byte)
if !ok {
return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
}
var record pb.Record
if err := proto.Unmarshal(recordBytes, &record); err != nil {
return nil, errors.New("failed to unmarshal dht record from datastore")
}
return &record, nil
}
func putRoutingRecord(ds datastore.Datastore, k string, value *pb.Record) error {
data, err := proto.Marshal(value)
if err != nil {
return err
}
dskey := dshelp.NewKeyFromBinary([]byte(k))
// TODO namespace
return ds.Put(dskey, data)
}
func putRoutingProviders(ds datastore.Datastore, k string, newRecords []*dhtpb.Message_Peer) error {
log.Event(context.Background(), "putRoutingProviders")
oldRecords, err := getRoutingProviders(ds, k)
if err != nil {
return err
}
mergedRecords := make(map[string]*dhtpb.Message_Peer)
for _, provider := range oldRecords {
mergedRecords[provider.GetId()] = provider // add original records
}
for _, provider := range newRecords {
mergedRecords[provider.GetId()] = provider // overwrite old record if new exists
}
var protomsg dhtpb.Message
protomsg.ProviderPeers = make([]*dhtpb.Message_Peer, 0, len(mergedRecords))
for _, provider := range mergedRecords {
protomsg.ProviderPeers = append(protomsg.ProviderPeers, provider)
}
data, err := proto.Marshal(&protomsg)
if err != nil {
return err
}
return ds.Put(providerKey(k), data)
}
func storeProvidersToPeerstore(ps pstore.Peerstore, p peer.ID, providers []*dhtpb.Message_Peer) {
for _, provider := range providers {
providerID := peer.ID(provider.GetId())
if providerID != p {
log.Errorf("provider message came from third-party %s", p)
continue
}
for _, maddr := range provider.Addresses() {
// as a router, we want to store addresses for peers who have provided
ps.AddAddr(p, maddr, pstore.AddressTTL)
}
}
}
func getRoutingProviders(ds datastore.Datastore, k string) ([]*dhtpb.Message_Peer, error) {
e := log.EventBegin(context.Background(), "getProviders")
defer e.Done()
var providers []*dhtpb.Message_Peer
if v, err := ds.Get(providerKey(k)); err == nil {
if data, ok := v.([]byte); ok {
var msg dhtpb.Message
if err := proto.Unmarshal(data, &msg); err != nil {
return nil, err
}
providers = append(providers, msg.GetProviderPeers()...)
}
}
return providers, nil
}
func providerKey(k string) datastore.Key {
return datastore.KeyWithNamespaces([]string{"routing", "providers", k})
}
package supernode
import (
"testing"
dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb"
datastore "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
)
func TestPutProviderDoesntResultInDuplicates(t *testing.T) {
routingBackend := datastore.NewMapDatastore()
k := "foo"
put := []*dhtpb.Message_Peer{
convPeer("bob", "127.0.0.1/tcp/4001"),
convPeer("alice", "10.0.0.10/tcp/4001"),
}
if err := putRoutingProviders(routingBackend, k, put); err != nil {
t.Fatal(err)
}
if err := putRoutingProviders(routingBackend, k, put); err != nil {
t.Fatal(err)
}
got, err := getRoutingProviders(routingBackend, k)
if err != nil {
t.Fatal(err)
}
if len(got) != 2 {
t.Fatal("should be 2 values, but there are", len(got))
}
}
func convPeer(name string, addrs ...string) *dhtpb.Message_Peer {
var rawAddrs [][]byte
for _, addr := range addrs {
rawAddrs = append(rawAddrs, []byte(addr))
}
return &dhtpb.Message_Peer{Id: &name, Addrs: rawAddrs}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment