client.go 4.77 KB
Newer Older
1
package supernode
2 3 4

import (
	"bytes"
5
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"errors"
7 8
	"time"

9
	proxy "github.com/ipfs/go-ipfs/routing/supernode/proxy"
Jeromy's avatar
Jeromy committed
10

11
	pstore "gx/ipfs/QmQMQ2RUjnaEEX8ybmrhuFFGhAwPjyL1Eo6ZoJGD7aAccM/go-libp2p-peerstore"
Jeromy's avatar
Jeromy committed
12
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
13
	loggables "gx/ipfs/QmTcfnDHimxBJqx6utpnWqVHdvyquXgkwAvYt4zMaJMKS2/go-libp2p-loggables"
14
	dhtpb "gx/ipfs/QmUpZqxzrUoyDsgWXDri9yYgi5r5EK7J5Tan1MbgnawYLx/go-libp2p-kad-dht/pb"
15
	cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
Jeromy's avatar
Jeromy committed
16
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
17 18 19 20
	peer "gx/ipfs/QmZcUPvPhD1Xvk6mwijYF8AfR3mG31S1YsEfHG4khrFPRr/go-libp2p-peer"
	routing "gx/ipfs/QmZghcVHwXQC3Zvnvn24LgTmSPkEn2o3PDyKb6nrtPRzRh/go-libp2p-routing"
	pb "gx/ipfs/QmZp9q8DbrGLztoxpkTC62mnRayRwHcAzGJJ8AvYRwjanR/go-libp2p-record/pb"
	"gx/ipfs/QmbzbRyd22gcW92U1rA2yKagB3myMYhk45XBknJ49F9XWJ/go-libp2p-host"
21 22
)

Jeromy's avatar
Jeromy committed
23
var log = logging.Logger("supernode")
24 25

type Client struct {
26
	peerhost  host.Host
Jeromy's avatar
Jeromy committed
27
	peerstore pstore.Peerstore
28 29 30 31 32
	proxy     proxy.Proxy
	local     peer.ID
}

// TODO take in datastore/cache
Jeromy's avatar
Jeromy committed
33
func NewClient(px proxy.Proxy, h host.Host, ps pstore.Peerstore, local peer.ID) (*Client, error) {
34 35 36 37
	return &Client{
		proxy:     px,
		local:     local,
		peerstore: ps,
38
		peerhost:  h,
39 40 41
	}, nil
}

42
func (c *Client) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan pstore.PeerInfo {
43
	logging.ContextWithLoggable(ctx, loggables.Uuid("findProviders"))
44
	defer log.EventBegin(ctx, "findProviders", k).Done()
Jeromy's avatar
Jeromy committed
45
	ch := make(chan pstore.PeerInfo)
46 47
	go func() {
		defer close(ch)
48
		request := dhtpb.NewMessage(dhtpb.Message_GET_PROVIDERS, k.KeyString(), 0)
49 50
		response, err := c.proxy.SendRequest(ctx, request)
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
51
			log.Debug(err)
52 53
			return
		}
54
		for _, p := range dhtpb.PBPeersToPeerInfos(response.GetProviderPeers()) {
55 56
			select {
			case <-ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57
				log.Debug(ctx.Err())
58 59 60 61 62 63 64 65
				return
			case ch <- p:
			}
		}
	}()
	return ch
}

66 67
func (c *Client) PutValue(ctx context.Context, k string, v []byte) error {
	defer log.EventBegin(ctx, "putValue").Done()
68 69 70 71
	r, err := makeRecord(c.peerstore, c.local, k, v)
	if err != nil {
		return err
	}
72
	pmes := dhtpb.NewMessage(dhtpb.Message_PUT_VALUE, string(k), 0)
73 74 75 76
	pmes.Record = r
	return c.proxy.SendMessage(ctx, pmes) // wrap to hide the remote
}

77 78
func (c *Client) GetValue(ctx context.Context, k string) ([]byte, error) {
	defer log.EventBegin(ctx, "getValue").Done()
79
	msg := dhtpb.NewMessage(dhtpb.Message_GET_VALUE, string(k), 0)
80 81
	response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82
		return nil, err
83 84 85 86
	}
	return response.Record.GetValue(), nil
}

87 88
func (c *Client) GetValues(ctx context.Context, k string, _ int) ([]routing.RecvdVal, error) {
	defer log.EventBegin(ctx, "getValue").Done()
89
	msg := dhtpb.NewMessage(dhtpb.Message_GET_VALUE, string(k), 0)
90 91 92 93 94 95 96 97 98 99 100 101 102
	response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
	if err != nil {
		return nil, err
	}

	return []routing.RecvdVal{
		{
			Val:  response.Record.GetValue(),
			From: c.local,
		},
	}, nil
}

103 104 105
func (c *Client) Provide(ctx context.Context, k *cid.Cid) error {
	defer log.EventBegin(ctx, "provide", k).Done()
	msg := dhtpb.NewMessage(dhtpb.Message_ADD_PROVIDER, k.KeyString(), 0)
106
	// FIXME how is connectedness defined for the local node
107
	pri := []dhtpb.PeerRoutingInfo{
rht's avatar
rht committed
108
		{
Jeromy's avatar
Jeromy committed
109
			PeerInfo: pstore.PeerInfo{
110 111
				ID:    c.local,
				Addrs: c.peerhost.Addrs(),
112 113 114
			},
		},
	}
115
	msg.ProviderPeers = dhtpb.PeerRoutingInfosToPBPeers(pri)
116
	return c.proxy.SendMessage(ctx, msg) // TODO wrap to hide remote
117 118
}

Jeromy's avatar
Jeromy committed
119
func (c *Client) FindPeer(ctx context.Context, id peer.ID) (pstore.PeerInfo, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
120
	defer log.EventBegin(ctx, "findPeer", id).Done()
121
	request := dhtpb.NewMessage(dhtpb.Message_FIND_NODE, string(id), 0)
122 123
	response, err := c.proxy.SendRequest(ctx, request) // hide remote
	if err != nil {
Jeromy's avatar
Jeromy committed
124
		return pstore.PeerInfo{}, err
125
	}
126
	for _, p := range dhtpb.PBPeersToPeerInfos(response.GetCloserPeers()) {
127 128 129 130
		if p.ID == id {
			return p, nil
		}
	}
Jeromy's avatar
Jeromy committed
131
	return pstore.PeerInfo{}, errors.New("could not find peer")
132 133 134
}

// creates and signs a record for the given key/value pair
135
func makeRecord(ps pstore.Peerstore, p peer.ID, k string, v []byte) (*pb.Record, error) {
136 137 138 139 140 141 142 143 144 145 146 147 148 149
	blob := bytes.Join([][]byte{[]byte(k), v, []byte(p)}, []byte{})
	sig, err := ps.PrivKey(p).Sign(blob)
	if err != nil {
		return nil, err
	}
	return &pb.Record{
		Key:       proto.String(string(k)),
		Value:     v,
		Author:    proto.String(string(p)),
		Signature: sig,
	}, nil
}

func (c *Client) Ping(ctx context.Context, id peer.ID) (time.Duration, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
150
	defer log.EventBegin(ctx, "ping", id).Done()
151
	return time.Nanosecond, errors.New("supernode routing does not support the ping method")
152 153
}

154 155 156 157
func (c *Client) Bootstrap(ctx context.Context) error {
	return c.proxy.Bootstrap(ctx)
}

158
var _ routing.IpfsRouting = &Client{}