client.go 4.64 KB
Newer Older
1
package supernode
2 3 4

import (
	"bytes"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5
	"errors"
6 7
	"time"

8 9
	key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"

10 11 12
	routing "github.com/ipfs/go-ipfs/routing"
	pb "github.com/ipfs/go-ipfs/routing/dht/pb"
	proxy "github.com/ipfs/go-ipfs/routing/supernode/proxy"
13
	loggables "gx/ipfs/QmYrv4LgCC8FhG2Ab4bwuq5DqBdwMtx3hMb3KKJDZcr2d7/go-libp2p-loggables"
Jeromy's avatar
Jeromy committed
14

Jeromy's avatar
Jeromy committed
15 16 17
	pstore "gx/ipfs/QmSZi9ygLohBUGyHMqE5N6eToPwqcg7bZQTULeVLFu7Q6d/go-libp2p-peerstore"
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
	peer "gx/ipfs/QmWtbQU15LaB5B1JC2F7TV9P4K88vD3PpA4AJrwfCjhML8/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
18 19
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Jeromy's avatar
Jeromy committed
20
	"gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/host"
21 22
)

Jeromy's avatar
Jeromy committed
23
var log = logging.Logger("supernode")
24 25

type Client struct {
26
	peerhost  host.Host
Jeromy's avatar
Jeromy committed
27
	peerstore pstore.Peerstore
28 29 30 31 32
	proxy     proxy.Proxy
	local     peer.ID
}

// TODO take in datastore/cache
Jeromy's avatar
Jeromy committed
33
func NewClient(px proxy.Proxy, h host.Host, ps pstore.Peerstore, local peer.ID) (*Client, error) {
34 35 36 37
	return &Client{
		proxy:     px,
		local:     local,
		peerstore: ps,
38
		peerhost:  h,
39 40 41
	}, nil
}

Jeromy's avatar
Jeromy committed
42
func (c *Client) FindProvidersAsync(ctx context.Context, k key.Key, max int) <-chan pstore.PeerInfo {
43
	logging.ContextWithLoggable(ctx, loggables.Uuid("findProviders"))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
44
	defer log.EventBegin(ctx, "findProviders", &k).Done()
Jeromy's avatar
Jeromy committed
45
	ch := make(chan pstore.PeerInfo)
46 47 48 49 50
	go func() {
		defer close(ch)
		request := pb.NewMessage(pb.Message_GET_PROVIDERS, string(k), 0)
		response, err := c.proxy.SendRequest(ctx, request)
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
51
			log.Debug(err)
52 53 54 55 56
			return
		}
		for _, p := range pb.PBPeersToPeerInfos(response.GetProviderPeers()) {
			select {
			case <-ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57
				log.Debug(ctx.Err())
58 59 60 61 62 63 64 65
				return
			case ch <- p:
			}
		}
	}()
	return ch
}

66
func (c *Client) PutValue(ctx context.Context, k key.Key, v []byte) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
67
	defer log.EventBegin(ctx, "putValue", &k).Done()
68 69 70 71 72 73 74 75 76
	r, err := makeRecord(c.peerstore, c.local, k, v)
	if err != nil {
		return err
	}
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(k), 0)
	pmes.Record = r
	return c.proxy.SendMessage(ctx, pmes) // wrap to hide the remote
}

77
func (c *Client) GetValue(ctx context.Context, k key.Key) ([]byte, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
78
	defer log.EventBegin(ctx, "getValue", &k).Done()
79 80 81
	msg := pb.NewMessage(pb.Message_GET_VALUE, string(k), 0)
	response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82
		return nil, err
83 84 85 86
	}
	return response.Record.GetValue(), nil
}

87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
func (c *Client) GetValues(ctx context.Context, k key.Key, _ int) ([]routing.RecvdVal, error) {
	defer log.EventBegin(ctx, "getValue", &k).Done()
	msg := pb.NewMessage(pb.Message_GET_VALUE, string(k), 0)
	response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
	if err != nil {
		return nil, err
	}

	return []routing.RecvdVal{
		{
			Val:  response.Record.GetValue(),
			From: c.local,
		},
	}, nil
}

103
func (c *Client) Provide(ctx context.Context, k key.Key) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
104
	defer log.EventBegin(ctx, "provide", &k).Done()
105
	msg := pb.NewMessage(pb.Message_ADD_PROVIDER, string(k), 0)
106 107
	// FIXME how is connectedness defined for the local node
	pri := []pb.PeerRoutingInfo{
rht's avatar
rht committed
108
		{
Jeromy's avatar
Jeromy committed
109
			PeerInfo: pstore.PeerInfo{
110 111
				ID:    c.local,
				Addrs: c.peerhost.Addrs(),
112 113 114 115 116
			},
		},
	}
	msg.ProviderPeers = pb.PeerRoutingInfosToPBPeers(pri)
	return c.proxy.SendMessage(ctx, msg) // TODO wrap to hide remote
117 118
}

Jeromy's avatar
Jeromy committed
119
func (c *Client) FindPeer(ctx context.Context, id peer.ID) (pstore.PeerInfo, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
120
	defer log.EventBegin(ctx, "findPeer", id).Done()
121 122 123
	request := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
	response, err := c.proxy.SendRequest(ctx, request) // hide remote
	if err != nil {
Jeromy's avatar
Jeromy committed
124
		return pstore.PeerInfo{}, err
125 126 127 128 129 130
	}
	for _, p := range pb.PBPeersToPeerInfos(response.GetCloserPeers()) {
		if p.ID == id {
			return p, nil
		}
	}
Jeromy's avatar
Jeromy committed
131
	return pstore.PeerInfo{}, errors.New("could not find peer")
132 133 134
}

// creates and signs a record for the given key/value pair
Jeromy's avatar
Jeromy committed
135
func makeRecord(ps pstore.Peerstore, p peer.ID, k key.Key, v []byte) (*pb.Record, error) {
136 137 138 139 140 141 142 143 144 145 146 147 148 149
	blob := bytes.Join([][]byte{[]byte(k), v, []byte(p)}, []byte{})
	sig, err := ps.PrivKey(p).Sign(blob)
	if err != nil {
		return nil, err
	}
	return &pb.Record{
		Key:       proto.String(string(k)),
		Value:     v,
		Author:    proto.String(string(p)),
		Signature: sig,
	}, nil
}

func (c *Client) Ping(ctx context.Context, id peer.ID) (time.Duration, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
150
	defer log.EventBegin(ctx, "ping", id).Done()
151
	return time.Nanosecond, errors.New("supernode routing does not support the ping method")
152 153
}

154 155 156 157
func (c *Client) Bootstrap(ctx context.Context) error {
	return c.proxy.Bootstrap(ctx)
}

158
var _ routing.IpfsRouting = &Client{}