Commit 577baaf6 authored by Brian Tiger Chow's avatar Brian Tiger Chow

feat(routing.grandcentral): skeleton

fixes breakages:
  - peer.Peer -> peer.ID
  - peer.X -> peer.PeerInfo
  - netmsg -> p2p streams
parent 92c7f967
package grandcentral
import (
"bytes"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer"
routing "github.com/jbenet/go-ipfs/routing"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
proxy "github.com/jbenet/go-ipfs/routing/grandcentral/proxy"
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
u "github.com/jbenet/go-ipfs/util"
errors "github.com/jbenet/go-ipfs/util/debugerror"
)
var log = eventlog.Logger("grandcentral")
var ErrTODO = errors.New("TODO")
type Client struct {
peerstore peer.Peerstore
proxy proxy.Proxy
dialer inet.Network
local peer.ID
}
// TODO take in datastore/cache
func NewClient(d inet.Network, px proxy.Proxy, ps peer.Peerstore, local peer.ID) (*Client, error) {
return &Client{
dialer: d,
proxy: px,
local: local,
peerstore: ps,
}, nil
}
func (c *Client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.PeerInfo {
ch := make(chan peer.PeerInfo)
go func() {
defer close(ch)
request := pb.NewMessage(pb.Message_GET_PROVIDERS, string(k), 0)
response, err := c.proxy.SendRequest(ctx, request)
if err != nil {
log.Error(errors.Wrap(err))
return
}
for _, p := range pb.PBPeersToPeerInfos(response.GetProviderPeers()) {
select {
case <-ctx.Done():
log.Error(errors.Wrap(ctx.Err()))
return
case ch <- p:
}
}
}()
return ch
}
func (c *Client) PutValue(ctx context.Context, k u.Key, v []byte) error {
r, err := makeRecord(c.peerstore, c.local, k, v)
if err != nil {
return err
}
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(k), 0)
pmes.Record = r
return c.proxy.SendMessage(ctx, pmes) // wrap to hide the remote
}
func (c *Client) GetValue(ctx context.Context, k u.Key) ([]byte, error) {
msg := pb.NewMessage(pb.Message_GET_VALUE, string(k), 0)
response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
if err != nil {
return nil, errors.Wrap(err)
}
return response.Record.GetValue(), nil
}
func (c *Client) Provide(ctx context.Context, k u.Key) error {
msg := pb.NewMessage(pb.Message_ADD_PROVIDER, string(k), 0)
// TODO wrap this to hide the dialer and the local/remote peers
msg.ProviderPeers = pb.PeerInfosToPBPeers(c.dialer, []peer.PeerInfo{peer.PeerInfo{ID: c.local}}) // FIXME how is connectedness defined for the local node
return c.proxy.SendMessage(ctx, msg) // TODO wrap to hide remote
}
func (c *Client) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) {
request := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
response, err := c.proxy.SendRequest(ctx, request) // hide remote
if err != nil {
return peer.PeerInfo{}, errors.Wrap(err)
}
for _, p := range pb.PBPeersToPeerInfos(response.GetCloserPeers()) {
if p.ID == id {
return p, nil
}
}
return peer.PeerInfo{}, errors.New("could not find peer")
}
// creates and signs a record for the given key/value pair
func makeRecord(ps peer.Peerstore, p peer.ID, k u.Key, v []byte) (*pb.Record, error) {
blob := bytes.Join([][]byte{[]byte(k), v, []byte(p)}, []byte{})
sig, err := ps.PrivKey(p).Sign(blob)
if err != nil {
return nil, err
}
return &pb.Record{
Key: proto.String(string(k)),
Value: v,
Author: proto.String(string(p)),
Signature: sig,
}, nil
}
func (c *Client) Ping(ctx context.Context, id peer.ID) (time.Duration, error) {
return time.Nanosecond, errors.New("grandcentral routing does not support the ping method")
}
var _ routing.IpfsRouting = &Client{}
package proxy
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer"
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
dhtpb "github.com/jbenet/go-ipfs/routing/dht/pb"
errors "github.com/jbenet/go-ipfs/util/debugerror"
)
// RequestHandler handles routing requests locally
type RequestHandler interface {
HandleRequest(ctx context.Context, p peer.ID, m *dhtpb.Message) *dhtpb.Message
}
// Loopback forwards requests to a local handler
type Loopback struct {
Handler RequestHandler
Local peer.ID
}
// SendMessage intercepts local requests, forwarding them to a local handler
func (lb *Loopback) SendMessage(ctx context.Context, m *dhtpb.Message) error {
response := lb.Handler.HandleRequest(ctx, lb.Local, m)
if response != nil {
log.Warning("loopback handler returned unexpected message")
}
return nil
}
// SendRequest intercepts local requests, forwarding them to a local handler
func (lb *Loopback) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error) {
return lb.Handler.HandleRequest(ctx, lb.Local, m), nil
}
func (lb *Loopback) handleNewStream(s inet.Stream) {
defer s.Close()
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
var incoming dhtpb.Message
if err := pbr.ReadMsg(&incoming); err != nil {
log.Error(errors.Wrap(err))
return
}
ctx := context.TODO()
outgoing := lb.Handler.HandleRequest(ctx, s.Conn().RemotePeer(), &incoming)
pbw := ggio.NewDelimitedWriter(s)
if err := pbw.WriteMsg(outgoing); err != nil {
return // TODO logerr
}
}
package proxy
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
host "github.com/jbenet/go-ipfs/p2p/host"
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer"
dhtpb "github.com/jbenet/go-ipfs/routing/dht/pb"
errors "github.com/jbenet/go-ipfs/util/debugerror"
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
)
var log = eventlog.Logger("proxy")
type Proxy interface {
SendMessage(ctx context.Context, m *dhtpb.Message) error
SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error)
}
type standard struct {
Host host.Host
Remote peer.ID
}
func Standard(h host.Host, remote peer.ID) Proxy {
return &standard{h, remote}
}
const ProtocolGCR = "/ipfs/grandcentral"
func (px *standard) SendMessage(ctx context.Context, m *dhtpb.Message) error {
if err := px.Host.Connect(ctx, peer.PeerInfo{ID: px.Remote}); err != nil {
return err
}
s, err := px.Host.NewStream(ProtocolGCR, px.Remote)
if err != nil {
return err
}
defer s.Close()
pbw := ggio.NewDelimitedWriter(s)
if err := pbw.WriteMsg(m); err != nil {
return errors.Wrap(err)
}
return nil
}
func (px *standard) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error) {
if err := px.Host.Connect(ctx, peer.PeerInfo{ID: px.Remote}); err != nil {
return nil, err
}
s, err := px.Host.NewStream(ProtocolGCR, px.Remote)
if err != nil {
return nil, err
}
defer s.Close()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
if err := w.WriteMsg(m); err != nil {
return nil, err
}
var reply dhtpb.Message
if err := r.ReadMsg(&reply); err != nil {
return nil, err
}
// need ctx expiration?
if &reply == nil {
return nil, errors.New("no response to request")
}
return &reply, nil
}
package grandcentral
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer"
dhtpb "github.com/jbenet/go-ipfs/routing/dht/pb"
proxy "github.com/jbenet/go-ipfs/routing/grandcentral/proxy"
util "github.com/jbenet/go-ipfs/util"
errors "github.com/jbenet/go-ipfs/util/debugerror"
)
// Server handles routing queries using a database backend
type Server struct {
local peer.ID
datastore datastore.ThreadSafeDatastore
dialer inet.Network
peerstore peer.Peerstore
*proxy.Loopback // so server can be injected into client
}
// NewServer creates a new GrandCentral routing Server
func NewServer(ds datastore.ThreadSafeDatastore, d inet.Network, ps peer.Peerstore, local peer.ID) (*Server, error) {
s := &Server{local, ds, d, ps, nil}
s.Loopback = &proxy.Loopback{
Handler: s,
Local: local,
}
return s, nil
}
// HandleLocalRequest implements the proxy.RequestHandler interface. This is
// where requests are received from the outside world.
func (s *Server) HandleRequest(ctx context.Context, p peer.ID, req *dhtpb.Message) *dhtpb.Message {
_, response := s.handleMessage(ctx, p, req) // ignore response peer. it's local.
return response
}
// TODO extract backend. backend can be implemented with whatever database we desire
func (s *Server) handleMessage(
ctx context.Context, p peer.ID, req *dhtpb.Message) (peer.ID, *dhtpb.Message) {
// FIXME threw everything into this switch statement to get things going.
// Once each operation is well-defined, extract pluggable backend so any
// database may be used.
var response = dhtpb.NewMessage(req.GetType(), req.GetKey(), req.GetClusterLevel())
switch req.GetType() {
case dhtpb.Message_GET_VALUE:
dskey := util.Key(req.GetKey()).DsKey()
val, err := s.datastore.Get(dskey)
if err != nil {
log.Error(errors.Wrap(err))
return "", nil
}
rawRecord, ok := val.([]byte)
if !ok {
log.Errorf("datastore had non byte-slice value for %v", dskey)
return "", nil
}
if err := proto.Unmarshal(rawRecord, response.Record); err != nil {
log.Error("failed to unmarshal dht record from datastore")
return "", nil
}
// TODO before merging: if we know any providers for the requested value, return those.
return p, response
case dhtpb.Message_PUT_VALUE:
// TODO before merging: verifyRecord(req.GetRecord())
data, err := proto.Marshal(req.GetRecord())
if err != nil {
log.Error(err)
return "", nil
}
dskey := util.Key(req.GetKey()).DsKey()
if err := s.datastore.Put(dskey, data); err != nil {
log.Error(err)
return "", nil
}
return p, req // TODO before merging: verify that we should return record
case dhtpb.Message_FIND_NODE:
p := s.peerstore.PeerInfo(peer.ID(req.GetKey()))
response.CloserPeers = dhtpb.PeerInfosToPBPeers(s.dialer, []peer.PeerInfo{p})
return p.ID, response
case dhtpb.Message_ADD_PROVIDER:
for _, provider := range req.GetProviderPeers() {
providerID := peer.ID(provider.GetId())
if providerID != p {
log.Errorf("provider message came from third-party %s", p)
continue
}
for _, maddr := range provider.Addresses() {
// FIXME do we actually want to store to peerstore
s.peerstore.AddAddress(p, maddr)
}
}
var providers []dhtpb.Message_Peer
pkey := datastore.KeyWithNamespaces([]string{"routing", "providers", req.GetKey()})
if v, err := s.datastore.Get(pkey); err == nil {
if protopeers, ok := v.([]dhtpb.Message_Peer); ok {
providers = append(providers, protopeers...)
}
}
if err := s.datastore.Put(pkey, providers); err != nil {
log.Error(err)
return "", nil
}
return "", nil
case dhtpb.Message_GET_PROVIDERS:
dskey := util.Key(req.GetKey()).DsKey()
exists, err := s.datastore.Has(dskey)
if err == nil && exists {
response.ProviderPeers = append(response.ProviderPeers, dhtpb.PeerInfosToPBPeers(s.dialer, []peer.PeerInfo{peer.PeerInfo{ID: s.local}})...)
}
// FIXME(btc) is this how we want to persist this data?
pkey := datastore.KeyWithNamespaces([]string{"routing", "providers", req.GetKey()})
if v, err := s.datastore.Get(pkey); err == nil {
if protopeers, ok := v.([]dhtpb.Message_Peer); ok {
for _, p := range protopeers {
response.ProviderPeers = append(response.ProviderPeers, &p)
}
}
}
return p, response
case dhtpb.Message_PING:
return p, req
default:
}
return "", nil
}
var _ proxy.RequestHandler = &Server{}
var _ proxy.Proxy = &Server{}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment