messages.go 5.92 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
package dht

import (
	"bytes"
	"context"
	"errors"
	"fmt"

	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/routing"

	record "github.com/libp2p/go-libp2p-record"
	recpb "github.com/libp2p/go-libp2p-record/pb"
	"github.com/multiformats/go-multihash"

17
	"github.com/libp2p/go-libp2p-kad-dht/internal"
18 19 20
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
)

Adin Schmahmann's avatar
Adin Schmahmann committed
21 22 23 24 25 26
// ProtocolMessenger can be used for sending DHT messages to peers and processing their responses.
// This decouples the wire protocol format from both the DHT protocol implementation and from the implementation of the
// routing.Routing interface.
//
// TODO: This is still strongly coupled with the existing implementation of what happens when a peer actually sends a
// message on the wire (e.g. reusing streams, reusing connections, metrics tracking, etc.).
27
type ProtocolMessenger struct {
28
	m         MessageSender
29 30 31
	validator record.Validator
}

32 33 34 35 36 37 38 39 40
type ProtocolMessengerOption func(*ProtocolMessenger) error

func WithValidator(validator record.Validator) ProtocolMessengerOption {
	return func(messenger *ProtocolMessenger) error {
		messenger.validator = validator
		return nil
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
41 42
// NewProtocolMessenger creates a new ProtocolMessenger that is used for sending DHT messages to peers and processing
// their responses.
43 44 45 46 47 48 49 50 51
func NewProtocolMessenger(msgSender MessageSender, opts ...ProtocolMessengerOption) (*ProtocolMessenger, error) {
	pm := &ProtocolMessenger{
		m: msgSender,
	}

	for _, o := range opts {
		if err := o(pm); err != nil {
			return nil, err
		}
52
	}
53 54 55 56 57 58 59 60 61 62

	return pm, nil
}

// MessageSender handles sending wire protocol messages to a given peer
type MessageSender interface {
	// SendRequest sends a peer a message and waits for its response
	SendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error)
	// SendMessage sends a peer a message without waiting on a response
	SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error
63 64
}

Adin Schmahmann's avatar
Adin Schmahmann committed
65
// PutValue asks a peer to store the given key/value pair.
66 67 68
func (pm *ProtocolMessenger) PutValue(ctx context.Context, p peer.ID, rec *recpb.Record) error {
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
	pmes.Record = rec
69
	rpmes, err := pm.m.SendRequest(ctx, p, pmes)
70
	if err != nil {
71
		logger.Debugw("failed to put value to peer", "to", p, "key", internal.LoggableRecordKeyBytes(rec.Key), "error", err)
72 73 74 75 76 77 78 79 80 81 82
		return err
	}

	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
		logger.Infow("value not put correctly", "put-message", pmes, "get-message", rpmes)
		return errors.New("value not put correctly")
	}

	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
83 84
// GetValue asks a peer for the value corresponding to the given key. Also returns the K closest peers to the key
// as described in GetClosestPeers.
85 86
func (pm *ProtocolMessenger) GetValue(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
87
	respMsg, err := pm.m.SendRequest(ctx, p, pmes)
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
	if err != nil {
		return nil, nil, err
	}

	// Perhaps we were given closer peers
	peers := pb.PBPeersToPeerInfos(respMsg.GetCloserPeers())

	if rec := respMsg.GetRecord(); rec != nil {
		// Success! We were given the value
		logger.Debug("got value")

		// make sure record is valid.
		err = pm.validator.Validate(string(rec.GetKey()), rec.GetValue())
		if err != nil {
			logger.Debug("received invalid record (discarded)")
Adin Schmahmann's avatar
Adin Schmahmann committed
103
			// return a sentinel to signify an invalid record was received
104 105 106 107 108 109 110 111 112 113 114 115 116
			err = errInvalidRecord
			rec = new(recpb.Record)
		}
		return rec, peers, err
	}

	if len(peers) > 0 {
		return nil, peers, nil
	}

	return nil, nil, routing.ErrNotFound
}

Adin Schmahmann's avatar
Adin Schmahmann committed
117 118 119
// GetClosestPeers asks a peer to return the K (a DHT-wide parameter) DHT server peers closest in XOR space to the id
// Note: If the peer happens to know another peer whose peerID exactly matches the given id it will return that peer
// even if that peer is not a DHT server node.
120 121
func (pm *ProtocolMessenger) GetClosestPeers(ctx context.Context, p peer.ID, id peer.ID) ([]*peer.AddrInfo, error) {
	pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
122
	respMsg, err := pm.m.SendRequest(ctx, p, pmes)
123 124 125 126 127 128 129
	if err != nil {
		return nil, err
	}
	peers := pb.PBPeersToPeerInfos(respMsg.GetCloserPeers())
	return peers, nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
130
// PutProvider asks a peer to store that we are a provider for the given key.
131 132 133 134 135 136
func (pm *ProtocolMessenger) PutProvider(ctx context.Context, p peer.ID, key multihash.Multihash, host host.Host) error {
	pi := peer.AddrInfo{
		ID:    host.ID(),
		Addrs: host.Addrs(),
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
137 138
	// TODO: We may want to limit the type of addresses in our provider records
	// For example, in a WAN-only DHT prohibit sharing non-WAN addresses (e.g. 192.168.0.100)
139 140 141 142 143 144 145
	if len(pi.Addrs) < 1 {
		return fmt.Errorf("no known addresses for self, cannot put provider")
	}

	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0)
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi})

146
	return pm.m.SendMessage(ctx, p, pmes)
147 148
}

Adin Schmahmann's avatar
Adin Schmahmann committed
149 150
// GetProviders asks a peer for the providers it knows of for a given key. Also returns the K closest peers to the key
// as described in GetClosestPeers.
151 152
func (pm *ProtocolMessenger) GetProviders(ctx context.Context, p peer.ID, key multihash.Multihash) ([]*peer.AddrInfo, []*peer.AddrInfo, error) {
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
153
	respMsg, err := pm.m.SendRequest(ctx, p, pmes)
154 155 156 157 158 159 160 161 162 163 164
	if err != nil {
		return nil, nil, err
	}
	provs := pb.PBPeersToPeerInfos(respMsg.GetProviderPeers())
	closerPeers := pb.PBPeersToPeerInfos(respMsg.GetCloserPeers())
	return provs, closerPeers, nil
}

// Ping sends a ping message to the passed peer and waits for a response.
func (pm *ProtocolMessenger) Ping(ctx context.Context, p peer.ID) error {
	req := pb.NewMessage(pb.Message_PING, nil, 0)
165
	resp, err := pm.m.SendRequest(ctx, p, req)
166 167 168 169 170 171 172 173
	if err != nil {
		return fmt.Errorf("sending request: %w", err)
	}
	if resp.Type != pb.Message_PING {
		return fmt.Errorf("got unexpected response type: %v", resp.Type)
	}
	return nil
}