client.go 3.67 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
package grandcentral

import (
	"bytes"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
	inet "github.com/jbenet/go-ipfs/p2p/net"
	peer "github.com/jbenet/go-ipfs/p2p/peer"
	routing "github.com/jbenet/go-ipfs/routing"
	pb "github.com/jbenet/go-ipfs/routing/dht/pb"
	proxy "github.com/jbenet/go-ipfs/routing/grandcentral/proxy"
	eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
	u "github.com/jbenet/go-ipfs/util"
	errors "github.com/jbenet/go-ipfs/util/debugerror"
)

var log = eventlog.Logger("grandcentral")

var ErrTODO = errors.New("TODO")

type Client struct {
	peerstore peer.Peerstore
	proxy     proxy.Proxy
	dialer    inet.Network
	local     peer.ID
}

// TODO take in datastore/cache
func NewClient(d inet.Network, px proxy.Proxy, ps peer.Peerstore, local peer.ID) (*Client, error) {
	return &Client{
		dialer:    d,
		proxy:     px,
		local:     local,
		peerstore: ps,
	}, nil
}

func (c *Client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.PeerInfo {
	ch := make(chan peer.PeerInfo)
	go func() {
		defer close(ch)
		request := pb.NewMessage(pb.Message_GET_PROVIDERS, string(k), 0)
		response, err := c.proxy.SendRequest(ctx, request)
		if err != nil {
47
			log.Debug(errors.Wrap(err))
48 49 50 51 52
			return
		}
		for _, p := range pb.PBPeersToPeerInfos(response.GetProviderPeers()) {
			select {
			case <-ctx.Done():
53
				log.Debug(errors.Wrap(ctx.Err()))
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
				return
			case ch <- p:
			}
		}
	}()
	return ch
}

func (c *Client) PutValue(ctx context.Context, k u.Key, v []byte) error {
	r, err := makeRecord(c.peerstore, c.local, k, v)
	if err != nil {
		return err
	}
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(k), 0)
	pmes.Record = r
	return c.proxy.SendMessage(ctx, pmes) // wrap to hide the remote
}

func (c *Client) GetValue(ctx context.Context, k u.Key) ([]byte, error) {
	msg := pb.NewMessage(pb.Message_GET_VALUE, string(k), 0)
	response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
	if err != nil {
		return nil, errors.Wrap(err)
	}
	return response.Record.GetValue(), nil
}

func (c *Client) Provide(ctx context.Context, k u.Key) error {
	msg := pb.NewMessage(pb.Message_ADD_PROVIDER, string(k), 0)
	// TODO wrap this to hide the dialer and the local/remote peers
	msg.ProviderPeers = pb.PeerInfosToPBPeers(c.dialer, []peer.PeerInfo{peer.PeerInfo{ID: c.local}}) // FIXME how is connectedness defined for the local node
	return c.proxy.SendMessage(ctx, msg)                                                             // TODO wrap to hide remote
}

func (c *Client) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) {
	request := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
	response, err := c.proxy.SendRequest(ctx, request) // hide remote
	if err != nil {
		return peer.PeerInfo{}, errors.Wrap(err)
	}
	for _, p := range pb.PBPeersToPeerInfos(response.GetCloserPeers()) {
		if p.ID == id {
			return p, nil
		}
	}
	return peer.PeerInfo{}, errors.New("could not find peer")
}

// creates and signs a record for the given key/value pair
func makeRecord(ps peer.Peerstore, p peer.ID, k u.Key, v []byte) (*pb.Record, error) {
	blob := bytes.Join([][]byte{[]byte(k), v, []byte(p)}, []byte{})
	sig, err := ps.PrivKey(p).Sign(blob)
	if err != nil {
		return nil, err
	}
	return &pb.Record{
		Key:       proto.String(string(k)),
		Value:     v,
		Author:    proto.String(string(p)),
		Signature: sig,
	}, nil
}

func (c *Client) Ping(ctx context.Context, id peer.ID) (time.Duration, error) {
	return time.Nanosecond, errors.New("grandcentral routing does not support the ping method")
}

var _ routing.IpfsRouting = &Client{}