client.go 5.01 KB
Newer Older
1
package supernode
2 3 4

import (
	"bytes"
5
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"errors"
7 8
	"time"

9
	proxy "github.com/ipfs/go-ipfs/routing/supernode/proxy"
Jeromy's avatar
Jeromy committed
10

11 12
	cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
	routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing"
13
	pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
Jeromy's avatar
Jeromy committed
14
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
15
	loggables "gx/ipfs/QmT4PgCNdv73hnFAqzHqwW44q7M9PWpykSswHDxndquZbc/go-libp2p-loggables"
Steven Allen's avatar
Steven Allen committed
16
	dhtpb "gx/ipfs/QmXF6hA3AWGNmE33GarqeCu3ksAXhLiwhcRR4mvfyWTZcT/go-libp2p-kad-dht/pb"
17
	peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
18
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
Steven Allen's avatar
Steven Allen committed
19
	"gx/ipfs/QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6/go-libp2p-host"
20
	pb "gx/ipfs/QmbxkgUceEcuSZ4ZdBA3x74VUDSSYjHYmmeEqkjxbtZ6Jg/go-libp2p-record/pb"
21 22
)

Jeromy's avatar
Jeromy committed
23
var log = logging.Logger("supernode")
24 25

type Client struct {
26
	peerhost  host.Host
Jeromy's avatar
Jeromy committed
27
	peerstore pstore.Peerstore
28 29 30 31 32
	proxy     proxy.Proxy
	local     peer.ID
}

// TODO take in datastore/cache
Jeromy's avatar
Jeromy committed
33
func NewClient(px proxy.Proxy, h host.Host, ps pstore.Peerstore, local peer.ID) (*Client, error) {
34 35 36 37
	return &Client{
		proxy:     px,
		local:     local,
		peerstore: ps,
38
		peerhost:  h,
39 40 41
	}, nil
}

42
func (c *Client) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan pstore.PeerInfo {
43
	logging.ContextWithLoggable(ctx, loggables.Uuid("findProviders"))
44
	defer log.EventBegin(ctx, "findProviders", k).Done()
Jeromy's avatar
Jeromy committed
45
	ch := make(chan pstore.PeerInfo)
46 47
	go func() {
		defer close(ch)
48
		request := dhtpb.NewMessage(dhtpb.Message_GET_PROVIDERS, k.KeyString(), 0)
49 50
		response, err := c.proxy.SendRequest(ctx, request)
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
51
			log.Debug(err)
52 53
			return
		}
54
		for _, p := range dhtpb.PBPeersToPeerInfos(response.GetProviderPeers()) {
55 56
			select {
			case <-ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57
				log.Debug(ctx.Err())
58
				return
59
			case ch <- *p:
60 61 62 63 64 65
			}
		}
	}()
	return ch
}

66 67
func (c *Client) PutValue(ctx context.Context, k string, v []byte) error {
	defer log.EventBegin(ctx, "putValue").Done()
68 69 70 71
	r, err := makeRecord(c.peerstore, c.local, k, v)
	if err != nil {
		return err
	}
72
	pmes := dhtpb.NewMessage(dhtpb.Message_PUT_VALUE, string(k), 0)
73 74 75 76
	pmes.Record = r
	return c.proxy.SendMessage(ctx, pmes) // wrap to hide the remote
}

77 78
func (c *Client) GetValue(ctx context.Context, k string) ([]byte, error) {
	defer log.EventBegin(ctx, "getValue").Done()
79
	msg := dhtpb.NewMessage(dhtpb.Message_GET_VALUE, string(k), 0)
80 81
	response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82
		return nil, err
83 84 85 86
	}
	return response.Record.GetValue(), nil
}

87 88
func (c *Client) GetValues(ctx context.Context, k string, _ int) ([]routing.RecvdVal, error) {
	defer log.EventBegin(ctx, "getValue").Done()
89
	msg := dhtpb.NewMessage(dhtpb.Message_GET_VALUE, string(k), 0)
90 91 92 93 94 95 96 97 98 99 100 101 102
	response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
	if err != nil {
		return nil, err
	}

	return []routing.RecvdVal{
		{
			Val:  response.Record.GetValue(),
			From: c.local,
		},
	}, nil
}

103 104 105 106 107 108 109
// Provide adds the given key 'k' to the content routing system. If 'brd' is
// true, it announces that content to the network.  For the supernode client,
// setting 'brd' to false makes this call a no-op
func (c *Client) Provide(ctx context.Context, k *cid.Cid, brd bool) error {
	if !brd {
		return nil
	}
110 111
	defer log.EventBegin(ctx, "provide", k).Done()
	msg := dhtpb.NewMessage(dhtpb.Message_ADD_PROVIDER, k.KeyString(), 0)
112
	// FIXME how is connectedness defined for the local node
113
	pri := []dhtpb.PeerRoutingInfo{
rht's avatar
rht committed
114
		{
Jeromy's avatar
Jeromy committed
115
			PeerInfo: pstore.PeerInfo{
116 117
				ID:    c.local,
				Addrs: c.peerhost.Addrs(),
118 119 120
			},
		},
	}
121
	msg.ProviderPeers = dhtpb.PeerRoutingInfosToPBPeers(pri)
122
	return c.proxy.SendMessage(ctx, msg) // TODO wrap to hide remote
123 124
}

Jeromy's avatar
Jeromy committed
125
func (c *Client) FindPeer(ctx context.Context, id peer.ID) (pstore.PeerInfo, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
126
	defer log.EventBegin(ctx, "findPeer", id).Done()
127
	request := dhtpb.NewMessage(dhtpb.Message_FIND_NODE, string(id), 0)
128 129
	response, err := c.proxy.SendRequest(ctx, request) // hide remote
	if err != nil {
Jeromy's avatar
Jeromy committed
130
		return pstore.PeerInfo{}, err
131
	}
132
	for _, p := range dhtpb.PBPeersToPeerInfos(response.GetCloserPeers()) {
133
		if p.ID == id {
134
			return *p, nil
135 136
		}
	}
Jeromy's avatar
Jeromy committed
137
	return pstore.PeerInfo{}, errors.New("could not find peer")
138 139 140
}

// creates and signs a record for the given key/value pair
141
func makeRecord(ps pstore.Peerstore, p peer.ID, k string, v []byte) (*pb.Record, error) {
142 143 144 145 146 147 148 149 150 151 152 153 154 155
	blob := bytes.Join([][]byte{[]byte(k), v, []byte(p)}, []byte{})
	sig, err := ps.PrivKey(p).Sign(blob)
	if err != nil {
		return nil, err
	}
	return &pb.Record{
		Key:       proto.String(string(k)),
		Value:     v,
		Author:    proto.String(string(p)),
		Signature: sig,
	}, nil
}

func (c *Client) Ping(ctx context.Context, id peer.ID) (time.Duration, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
156
	defer log.EventBegin(ctx, "ping", id).Done()
157
	return time.Nanosecond, errors.New("supernode routing does not support the ping method")
158 159
}

160 161 162 163
func (c *Client) Bootstrap(ctx context.Context) error {
	return c.proxy.Bootstrap(ctx)
}

164
var _ routing.IpfsRouting = &Client{}