client.go 4.04 KB
Newer Older
1
package supernode
2 3 4 5 6 7

import (
	"bytes"
	"time"

	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
8
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
9
	"github.com/jbenet/go-ipfs/p2p/host"
10 11 12
	peer "github.com/jbenet/go-ipfs/p2p/peer"
	routing "github.com/jbenet/go-ipfs/routing"
	pb "github.com/jbenet/go-ipfs/routing/dht/pb"
13
	proxy "github.com/jbenet/go-ipfs/routing/supernode/proxy"
14 15 16 17 18
	eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
	u "github.com/jbenet/go-ipfs/util"
	errors "github.com/jbenet/go-ipfs/util/debugerror"
)

19
var log = eventlog.Logger("supernode")
20 21

type Client struct {
22
	peerhost  host.Host
23 24 25 26 27 28
	peerstore peer.Peerstore
	proxy     proxy.Proxy
	local     peer.ID
}

// TODO take in datastore/cache
29
func NewClient(px proxy.Proxy, h host.Host, ps peer.Peerstore, local peer.ID) (*Client, error) {
30 31 32 33
	return &Client{
		proxy:     px,
		local:     local,
		peerstore: ps,
34
		peerhost:  h,
35 36 37 38
	}, nil
}

func (c *Client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.PeerInfo {
39
	ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("findProviders"))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
40
	defer log.EventBegin(ctx, "findProviders", &k).Done()
41 42 43 44 45 46
	ch := make(chan peer.PeerInfo)
	go func() {
		defer close(ch)
		request := pb.NewMessage(pb.Message_GET_PROVIDERS, string(k), 0)
		response, err := c.proxy.SendRequest(ctx, request)
		if err != nil {
47
			log.Debug(errors.Wrap(err))
48 49 50 51 52
			return
		}
		for _, p := range pb.PBPeersToPeerInfos(response.GetProviderPeers()) {
			select {
			case <-ctx.Done():
53
				log.Debug(errors.Wrap(ctx.Err()))
54 55 56 57 58 59 60 61
				return
			case ch <- p:
			}
		}
	}()
	return ch
}

62
func (c *Client) PutValue(ctx context.Context, k u.Key, v []byte) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
63
	defer log.EventBegin(ctx, "putValue", &k).Done()
64 65 66 67 68 69 70 71 72 73
	r, err := makeRecord(c.peerstore, c.local, k, v)
	if err != nil {
		return err
	}
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(k), 0)
	pmes.Record = r
	return c.proxy.SendMessage(ctx, pmes) // wrap to hide the remote
}

func (c *Client) GetValue(ctx context.Context, k u.Key) ([]byte, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
74
	defer log.EventBegin(ctx, "getValue", &k).Done()
75 76 77 78 79 80 81 82 83
	msg := pb.NewMessage(pb.Message_GET_VALUE, string(k), 0)
	response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
	if err != nil {
		return nil, errors.Wrap(err)
	}
	return response.Record.GetValue(), nil
}

func (c *Client) Provide(ctx context.Context, k u.Key) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
84
	defer log.EventBegin(ctx, "provide", &k).Done()
85
	msg := pb.NewMessage(pb.Message_ADD_PROVIDER, string(k), 0)
86 87 88 89
	// FIXME how is connectedness defined for the local node
	pri := []pb.PeerRoutingInfo{
		pb.PeerRoutingInfo{
			PeerInfo: peer.PeerInfo{
90 91
				ID:    c.local,
				Addrs: c.peerhost.Addrs(),
92 93 94 95 96
			},
		},
	}
	msg.ProviderPeers = pb.PeerRoutingInfosToPBPeers(pri)
	return c.proxy.SendMessage(ctx, msg) // TODO wrap to hide remote
97 98 99
}

func (c *Client) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
100
	defer log.EventBegin(ctx, "findPeer", id).Done()
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
	request := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
	response, err := c.proxy.SendRequest(ctx, request) // hide remote
	if err != nil {
		return peer.PeerInfo{}, errors.Wrap(err)
	}
	for _, p := range pb.PBPeersToPeerInfos(response.GetCloserPeers()) {
		if p.ID == id {
			return p, nil
		}
	}
	return peer.PeerInfo{}, errors.New("could not find peer")
}

// creates and signs a record for the given key/value pair
func makeRecord(ps peer.Peerstore, p peer.ID, k u.Key, v []byte) (*pb.Record, error) {
	blob := bytes.Join([][]byte{[]byte(k), v, []byte(p)}, []byte{})
	sig, err := ps.PrivKey(p).Sign(blob)
	if err != nil {
		return nil, err
	}
	return &pb.Record{
		Key:       proto.String(string(k)),
		Value:     v,
		Author:    proto.String(string(p)),
		Signature: sig,
	}, nil
}

func (c *Client) Ping(ctx context.Context, id peer.ID) (time.Duration, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
130
	defer log.EventBegin(ctx, "ping", id).Done()
131
	return time.Nanosecond, errors.New("supernode routing does not support the ping method")
132 133
}

134 135 136 137
func (c *Client) Bootstrap(ctx context.Context) error {
	return c.proxy.Bootstrap(ctx)
}

138
var _ routing.IpfsRouting = &Client{}