protocol_messenger.go 5.83 KB
Newer Older
1
package dht_pb
2 3 4 5 6 7 8 9 10 11 12

import (
	"bytes"
	"context"
	"errors"
	"fmt"

	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/routing"

13
	logging "github.com/ipfs/go-log"
14 15 16 17
	record "github.com/libp2p/go-libp2p-record"
	recpb "github.com/libp2p/go-libp2p-record/pb"
	"github.com/multiformats/go-multihash"

18
	"github.com/libp2p/go-libp2p-kad-dht/internal"
19 20
)

21 22
var logger = logging.Logger("dht")

Adin Schmahmann's avatar
Adin Schmahmann committed
23 24 25 26
// ProtocolMessenger can be used for sending DHT messages to peers and processing their responses.
// This decouples the wire protocol format from both the DHT protocol implementation and from the implementation of the
// routing.Routing interface.
//
27 28
// Note: the ProtocolMessenger's MessageSender still needs to deal with some wire protocol details such as using
// varint-delineated protobufs
29
type ProtocolMessenger struct {
30
	m         MessageSender
31 32 33
	validator record.Validator
}

34 35 36 37 38 39 40 41 42
type ProtocolMessengerOption func(*ProtocolMessenger) error

func WithValidator(validator record.Validator) ProtocolMessengerOption {
	return func(messenger *ProtocolMessenger) error {
		messenger.validator = validator
		return nil
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
43 44
// NewProtocolMessenger creates a new ProtocolMessenger that is used for sending DHT messages to peers and processing
// their responses.
45 46 47 48 49 50 51 52 53
func NewProtocolMessenger(msgSender MessageSender, opts ...ProtocolMessengerOption) (*ProtocolMessenger, error) {
	pm := &ProtocolMessenger{
		m: msgSender,
	}

	for _, o := range opts {
		if err := o(pm); err != nil {
			return nil, err
		}
54
	}
55 56 57 58 59 60 61

	return pm, nil
}

// MessageSender handles sending wire protocol messages to a given peer
type MessageSender interface {
	// SendRequest sends a peer a message and waits for its response
62
	SendRequest(ctx context.Context, p peer.ID, pmes *Message) (*Message, error)
63
	// SendMessage sends a peer a message without waiting on a response
64
	SendMessage(ctx context.Context, p peer.ID, pmes *Message) error
65 66
}

Adin Schmahmann's avatar
Adin Schmahmann committed
67
// PutValue asks a peer to store the given key/value pair.
68
func (pm *ProtocolMessenger) PutValue(ctx context.Context, p peer.ID, rec *recpb.Record) error {
69
	pmes := NewMessage(Message_PUT_VALUE, rec.Key, 0)
70
	pmes.Record = rec
71
	rpmes, err := pm.m.SendRequest(ctx, p, pmes)
72
	if err != nil {
73
		logger.Debugw("failed to put value to peer", "to", p, "key", internal.LoggableRecordKeyBytes(rec.Key), "error", err)
74 75 76 77
		return err
	}

	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
78 79 80
		const errStr = "value not put correctly"
		logger.Infow(errStr, "put-message", pmes, "get-message", rpmes)
		return errors.New(errStr)
81 82 83 84 85
	}

	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
86 87
// GetValue asks a peer for the value corresponding to the given key. Also returns the K closest peers to the key
// as described in GetClosestPeers.
88
func (pm *ProtocolMessenger) GetValue(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
89
	pmes := NewMessage(Message_GET_VALUE, []byte(key), 0)
90
	respMsg, err := pm.m.SendRequest(ctx, p, pmes)
91 92 93 94 95
	if err != nil {
		return nil, nil, err
	}

	// Perhaps we were given closer peers
96
	peers := PBPeersToPeerInfos(respMsg.GetCloserPeers())
97 98 99 100 101 102 103 104 105

	if rec := respMsg.GetRecord(); rec != nil {
		// Success! We were given the value
		logger.Debug("got value")

		// make sure record is valid.
		err = pm.validator.Validate(string(rec.GetKey()), rec.GetValue())
		if err != nil {
			logger.Debug("received invalid record (discarded)")
Adin Schmahmann's avatar
Adin Schmahmann committed
106
			// return a sentinel to signify an invalid record was received
107
			return nil, peers, internal.ErrInvalidRecord
108 109 110 111 112 113 114 115 116 117 118
		}
		return rec, peers, err
	}

	if len(peers) > 0 {
		return nil, peers, nil
	}

	return nil, nil, routing.ErrNotFound
}

Adin Schmahmann's avatar
Adin Schmahmann committed
119 120 121
// GetClosestPeers asks a peer to return the K (a DHT-wide parameter) DHT server peers closest in XOR space to the id
// Note: If the peer happens to know another peer whose peerID exactly matches the given id it will return that peer
// even if that peer is not a DHT server node.
122
func (pm *ProtocolMessenger) GetClosestPeers(ctx context.Context, p peer.ID, id peer.ID) ([]*peer.AddrInfo, error) {
123
	pmes := NewMessage(Message_FIND_NODE, []byte(id), 0)
124
	respMsg, err := pm.m.SendRequest(ctx, p, pmes)
125 126 127
	if err != nil {
		return nil, err
	}
128
	peers := PBPeersToPeerInfos(respMsg.GetCloserPeers())
129 130 131
	return peers, nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
132
// PutProvider asks a peer to store that we are a provider for the given key.
133 134 135 136 137 138
func (pm *ProtocolMessenger) PutProvider(ctx context.Context, p peer.ID, key multihash.Multihash, host host.Host) error {
	pi := peer.AddrInfo{
		ID:    host.ID(),
		Addrs: host.Addrs(),
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
139 140
	// TODO: We may want to limit the type of addresses in our provider records
	// For example, in a WAN-only DHT prohibit sharing non-WAN addresses (e.g. 192.168.0.100)
141 142 143 144
	if len(pi.Addrs) < 1 {
		return fmt.Errorf("no known addresses for self, cannot put provider")
	}

145 146
	pmes := NewMessage(Message_ADD_PROVIDER, key, 0)
	pmes.ProviderPeers = RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
147

148
	return pm.m.SendMessage(ctx, p, pmes)
149 150
}

Adin Schmahmann's avatar
Adin Schmahmann committed
151 152
// GetProviders asks a peer for the providers it knows of for a given key. Also returns the K closest peers to the key
// as described in GetClosestPeers.
153
func (pm *ProtocolMessenger) GetProviders(ctx context.Context, p peer.ID, key multihash.Multihash) ([]*peer.AddrInfo, []*peer.AddrInfo, error) {
154
	pmes := NewMessage(Message_GET_PROVIDERS, key, 0)
155
	respMsg, err := pm.m.SendRequest(ctx, p, pmes)
156 157 158
	if err != nil {
		return nil, nil, err
	}
159 160
	provs := PBPeersToPeerInfos(respMsg.GetProviderPeers())
	closerPeers := PBPeersToPeerInfos(respMsg.GetCloserPeers())
161 162 163 164 165
	return provs, closerPeers, nil
}

// Ping sends a ping message to the passed peer and waits for a response.
func (pm *ProtocolMessenger) Ping(ctx context.Context, p peer.ID) error {
166
	req := NewMessage(Message_PING, nil, 0)
167
	resp, err := pm.m.SendRequest(ctx, p, req)
168 169 170
	if err != nil {
		return fmt.Errorf("sending request: %w", err)
	}
171
	if resp.Type != Message_PING {
172 173 174 175
		return fmt.Errorf("got unexpected response type: %v", resp.Type)
	}
	return nil
}