Commit f7dad1aa authored by Brian Tiger Chow's avatar Brian Tiger Chow

feat(gcr/s) add eventlogs

parent 63164978
......@@ -36,6 +36,7 @@ func NewClient(px proxy.Proxy, h host.Host, ps peer.Peerstore, local peer.ID) (*
}
func (c *Client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.PeerInfo {
defer log.EventBegin(ctx, "findProviders", &k).Done()
ch := make(chan peer.PeerInfo)
go func() {
defer close(ch)
......@@ -58,6 +59,7 @@ func (c *Client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-cha
}
func (c *Client) PutValue(ctx context.Context, k u.Key, v []byte) error {
defer log.EventBegin(ctx, "putValue", &k).Done()
r, err := makeRecord(c.peerstore, c.local, k, v)
if err != nil {
return err
......@@ -68,6 +70,7 @@ func (c *Client) PutValue(ctx context.Context, k u.Key, v []byte) error {
}
func (c *Client) GetValue(ctx context.Context, k u.Key) ([]byte, error) {
defer log.EventBegin(ctx, "getValue", &k).Done()
msg := pb.NewMessage(pb.Message_GET_VALUE, string(k), 0)
response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
if err != nil {
......@@ -77,6 +80,7 @@ func (c *Client) GetValue(ctx context.Context, k u.Key) ([]byte, error) {
}
func (c *Client) Provide(ctx context.Context, k u.Key) error {
defer log.EventBegin(ctx, "provide", &k).Done()
msg := pb.NewMessage(pb.Message_ADD_PROVIDER, string(k), 0)
// FIXME how is connectedness defined for the local node
pri := []pb.PeerRoutingInfo{
......@@ -92,6 +96,7 @@ func (c *Client) Provide(ctx context.Context, k u.Key) error {
}
func (c *Client) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) {
defer log.EventBegin(ctx, "findPeer", id).Done()
request := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
response, err := c.proxy.SendRequest(ctx, request) // hide remote
if err != nil {
......@@ -121,6 +126,7 @@ func makeRecord(ps peer.Peerstore, p peer.ID, k u.Key, v []byte) (*pb.Record, er
}
func (c *Client) Ping(ctx context.Context, id peer.ID) (time.Duration, error) {
defer log.EventBegin(ctx, "ping", id).Done()
return time.Nanosecond, errors.New("grandcentral routing does not support the ping method")
}
......
......@@ -13,10 +13,10 @@ import (
errors "github.com/jbenet/go-ipfs/util/debugerror"
)
var log = eventlog.Logger("proxy")
const ProtocolGCR = "/ipfs/grandcentral"
var log = eventlog.Logger("grandcentral/proxy")
type Proxy interface {
HandleStream(inet.Stream)
SendMessage(ctx context.Context, m *dhtpb.Message) error
......@@ -48,8 +48,15 @@ func (px *standard) SendMessage(ctx context.Context, m *dhtpb.Message) error {
return err // NB: returns the last error
}
func (px *standard) sendMessage(ctx context.Context, m *dhtpb.Message, remote peer.ID) error {
if err := px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil {
func (px *standard) sendMessage(ctx context.Context, m *dhtpb.Message, remote peer.ID) (err error) {
e := log.EventBegin(ctx, "sendRoutingMessage", px.Host.ID(), remote, m)
defer func() {
if err != nil {
e.SetError(err)
}
e.Done()
}()
if err = px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil {
return err
}
s, err := px.Host.NewStream(ProtocolGCR, remote)
......@@ -78,8 +85,15 @@ func (px *standard) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.M
return nil, err // NB: returns the last error
}
func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote peer.ID) (*dhtpb.Message, error) {
if err := px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil {
func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote peer.ID) (_ *dhtpb.Message, err error) {
e := log.EventBegin(ctx, "sendRoutingRequest", px.Host.ID(), remote, m)
defer func() {
if err != nil {
e.SetError(err)
}
e.Done()
}()
if err = px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil {
return nil, err
}
s, err := px.Host.NewStream(ProtocolGCR, remote)
......@@ -89,12 +103,12 @@ func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote pe
defer s.Close()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
if err := w.WriteMsg(m); err != nil {
if err = w.WriteMsg(m); err != nil {
return nil, err
}
var reply dhtpb.Message
if err := r.ReadMsg(&reply); err != nil {
if err = r.ReadMsg(&reply); err != nil {
return nil, err
}
// need ctx expiration?
......
......@@ -42,6 +42,8 @@ func (s *Server) HandleRequest(ctx context.Context, p peer.ID, req *dhtpb.Messag
func (s *Server) handleMessage(
ctx context.Context, p peer.ID, req *dhtpb.Message) (peer.ID, *dhtpb.Message) {
log.EventBegin(ctx, "routingMessageReceived", req, p, s.local).Done() // TODO may need to differentiate between local and remote
// FIXME threw everything into this switch statement to get things going.
// Once each operation is well-defined, extract pluggable backend so any
// database may be used.
......@@ -131,6 +133,7 @@ func putRoutingRecord(ds datastore.Datastore, k util.Key, value *dhtpb.Record) e
}
func putRoutingProviders(ds datastore.Datastore, k util.Key, providers []*dhtpb.Message_Peer) error {
log.Event(context.Background(), "putRoutingProviders", &k)
pkey := datastore.KeyWithNamespaces([]string{"routing", "providers", k.String()})
if v, err := ds.Get(pkey); err == nil {
if msg, ok := v.([]byte); ok {
......@@ -166,6 +169,7 @@ func storeProvidersToPeerstore(ps peer.Peerstore, p peer.ID, providers []*dhtpb.
}
func getRoutingProviders(local peer.ID, ds datastore.Datastore, k util.Key) ([]*dhtpb.Message_Peer, error) {
log.Event(context.Background(), "getProviders", local, &k)
var providers []*dhtpb.Message_Peer
exists, err := ds.Has(k.DsKey()) // TODO store values in a local datastore?
if err == nil && exists {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment