handlers.go 8.86 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6
	"errors"
	"fmt"
7
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8

9
	proto "github.com/gogo/protobuf/proto"
10
	cid "github.com/ipfs/go-cid"
George Antoniadis's avatar
George Antoniadis committed
11
	ds "github.com/ipfs/go-datastore"
12
	u "github.com/ipfs/go-ipfs-util"
George Antoniadis's avatar
George Antoniadis committed
13
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
14 15 16
	lgbl "github.com/libp2p/go-libp2p-loggables"
	peer "github.com/libp2p/go-libp2p-peer"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
17
	recpb "github.com/libp2p/go-libp2p-record/pb"
18
	base32 "github.com/whyrusleeping/base32"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20
)

Jeromy's avatar
Jeromy committed
21
// The number of closer peers to send on requests.
Jeromy's avatar
Jeromy committed
22
var CloserPeerCount = KValue
23

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24
// dhthandler specifies the signature of functions that handle DHT messages.
25
type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26

27
func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28
	switch t {
29
	case pb.Message_GET_VALUE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30
		return dht.handleGetValue
31
	case pb.Message_PUT_VALUE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32
		return dht.handlePutValue
33
	case pb.Message_FIND_NODE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34
		return dht.handleFindPeer
35
	case pb.Message_ADD_PROVIDER:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36
		return dht.handleAddProvider
37
	case pb.Message_GET_PROVIDERS:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38
		return dht.handleGetProviders
39
	case pb.Message_PING:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40 41 42 43 44 45
		return dht.handlePing
	default:
		return nil
	}
}

46
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
47
	defer log.EventBegin(ctx, "handleGetValue", p).Done()
Jeromy's avatar
Jeromy committed
48
	log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49 50

	// setup response
51
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52

53
	// first, is there even a key?
54
	k := pmes.GetKey()
55
	if k == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
56
		return nil, errors.New("handleGetValue but no key was provided")
57
		// TODO: send back an error response? could be bad, but the other node's hanging.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58 59
	}

60 61
	rec, err := dht.checkLocalDatastore(k)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62 63
		return nil, err
	}
64
	resp.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65 66

	// Find closest peer on given cluster to desired key and reply with that info
67
	closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Jeromy's avatar
Jeromy committed
68
	if len(closer) > 0 {
Jeromy's avatar
Jeromy committed
69
		closerinfos := pstore.PeerInfos(dht.peerstore, closer)
70 71 72
		for _, pi := range closerinfos {
			log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
			if len(pi.Addrs) < 1 {
73
				log.Warningf(`no addresses on peer being sent!
74 75 76
					[local:%s]
					[sending:%s]
					[remote:%s]`, dht.self, pi.ID, p)
Jeromy's avatar
Jeromy committed
77
			}
78
		}
79

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81 82 83 84 85
	}

	return resp, nil
}

86
func (dht *IpfsDHT) checkLocalDatastore(k string) (*recpb.Record, error) {
87
	log.Debugf("%s handleGetValue looking into ds", dht.self)
88
	dskey := convertToDsKey(k)
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
	iVal, err := dht.datastore.Get(dskey)
	log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)

	if err == ds.ErrNotFound {
		return nil, nil
	}

	// if we got an unexpected error, bail.
	if err != nil {
		return nil, err
	}

	// if we have the value, send it back
	log.Debugf("%s handleGetValue success!", dht.self)

	byts, ok := iVal.([]byte)
	if !ok {
		return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
	}

George Antoniadis's avatar
George Antoniadis committed
109
	rec := new(recpb.Record)
110 111
	err = proto.Unmarshal(byts, rec)
	if err != nil {
112
		log.Debug("failed to unmarshal DHT record from datastore")
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
		return nil, err
	}

	// if its our record, dont bother checking the times on it
	if peer.ID(rec.GetAuthor()) == dht.self {
		return rec, nil
	}

	var recordIsBad bool
	recvtime, err := u.ParseRFC3339(rec.GetTimeReceived())
	if err != nil {
		log.Info("either no receive time set on record, or it was invalid: ", err)
		recordIsBad = true
	}

	if time.Now().Sub(recvtime) > MaxRecordAge {
		log.Debug("old record found, tossing.")
		recordIsBad = true
	}

133
	// NOTE: We do not verify the record here beyond checking these timestamps.
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
	// we put the burden of checking the records on the requester as checking a record
	// may be computationally expensive

	if recordIsBad {
		err := dht.datastore.Delete(dskey)
		if err != nil {
			log.Error("Failed to delete bad record from datastore: ", err)
		}

		return nil, nil // can treat this as not having the record at all
	}

	return rec, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
149
// Store a value in this peer local storage
150
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
151
	defer log.EventBegin(ctx, "handlePutValue", p).Done()
152
	dskey := convertToDsKey(pmes.GetKey())
153

154 155 156 157 158 159 160
	rec := pmes.GetRecord()
	if rec == nil {
		log.Infof("Got nil record from: %s", p.Pretty())
		return nil, errors.New("nil record")
	}

	if err := dht.verifyRecordLocally(rec); err != nil {
161
		log.Warningf("Bad dht record in PUT from: %s. %s", peer.ID(pmes.GetRecord().GetAuthor()), err)
162 163 164
		return nil, err
	}

165 166 167 168
	// record the time we receive every record
	rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))

	data, err := proto.Marshal(rec)
169 170 171 172 173
	if err != nil {
		return nil, err
	}

	err = dht.datastore.Put(dskey, data)
Jeromy's avatar
Jeromy committed
174
	log.Debugf("%s handlePutValue %v", dht.self, dskey)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
175
	return pmes, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
176 177
}

178
func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
179
	log.Debugf("%s Responding to ping from %s!\n", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
180
	return pmes, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181 182
}

183
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
184
	defer log.EventBegin(ctx, "handleFindPeer", p).Done()
185
	resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
186
	var closest []peer.ID
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187 188

	// if looking for self... special case where we send it on CloserPeers.
189 190
	if peer.ID(pmes.GetKey()) == dht.self {
		closest = []peer.ID{dht.self}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191
	} else {
192
		closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
193 194 195
	}

	if closest == nil {
196
		log.Infof("%s handleFindPeer %s: could not find anything.", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
197 198 199
		return resp, nil
	}

Jeromy's avatar
Jeromy committed
200 201
	var withAddresses []pstore.PeerInfo
	closestinfos := pstore.PeerInfos(dht.peerstore, closest)
202 203 204 205
	for _, pi := range closestinfos {
		if len(pi.Addrs) > 0 {
			withAddresses = append(withAddresses, pi)
			log.Debugf("handleFindPeer: sending back '%s'", pi.ID)
206
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207 208
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209
	resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), withAddresses)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
210 211 212
	return resp, nil
}

213
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214 215 216 217
	lm := make(lgbl.DeferredMap)
	lm["peer"] = func() interface{} { return p.Pretty() }
	defer log.EventBegin(ctx, "handleGetProviders", lm).Done()

218
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
219 220 221 222 223 224
	c, err := cid.Cast([]byte(pmes.GetKey()))
	if err != nil {
		return nil, err
	}

	lm["key"] = func() interface{} { return c.String() }
225 226

	// debug logging niceness.
227
	reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, c)
228 229
	log.Debugf("%s begin", reqDesc)
	defer log.Debugf("%s end", reqDesc)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
230 231

	// check if we have this value, to add ourselves as provider.
232
	has, err := dht.datastore.Has(convertToDsKey(c.KeyString()))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
233
	if err != nil && err != ds.ErrNotFound {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234
		log.Debugf("unexpected datastore error: %v\n", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
235 236 237 238
		has = false
	}

	// setup providers
239
	providers := dht.providers.GetProviders(ctx, c)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
240 241
	if has {
		providers = append(providers, dht.self)
242
		log.Debugf("%s have the value. added self as provider", reqDesc)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
243 244 245
	}

	if providers != nil && len(providers) > 0 {
Jeromy's avatar
Jeromy committed
246
		infos := pstore.PeerInfos(dht.peerstore, providers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
247
		resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
248
		log.Debugf("%s have %d providers: %s", reqDesc, len(providers), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
249 250 251
	}

	// Also send closer peers.
252
	closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
253
	if closer != nil {
Jeromy's avatar
Jeromy committed
254
		infos := pstore.PeerInfos(dht.peerstore, closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
255
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
256
		log.Debugf("%s have %d closer peers: %s", reqDesc, len(closer), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
257 258 259 260 261
	}

	return resp, nil
}

262
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
263 264 265 266
	lm := make(lgbl.DeferredMap)
	lm["peer"] = func() interface{} { return p.Pretty() }

	defer log.EventBegin(ctx, "handleAddProvider", lm).Done()
267 268 269 270 271 272
	c, err := cid.Cast([]byte(pmes.GetKey()))
	if err != nil {
		return nil, err
	}

	lm["key"] = func() interface{} { return c.String() }
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
273

274
	log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, c)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
275

276
	// add provider should use the address given in the message
277 278 279 280 281
	pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
	for _, pi := range pinfos {
		if pi.ID != p {
			// we should ignore this provider reccord! not from originator.
			// (we chould sign them and check signature later...)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
282
			log.Debugf("handleAddProvider received provider %s from %s. Ignore.", pi.ID, p)
283 284
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
285

286
		if len(pi.Addrs) < 1 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
287
			log.Debugf("%s got no valid addresses for provider %s. Ignore.", dht.self, p)
288 289
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
290

291
		log.Infof("received provider %s for %s (addrs: %s)", p, c, pi.Addrs)
292
		if pi.ID != dht.self { // dont add own addrs.
293
			// add the received addresses to our peerstore.
Jeromy's avatar
Jeromy committed
294
			dht.peerstore.AddAddrs(pi.ID, pi.Addrs, pstore.ProviderAddrTTL)
295
		}
296
		dht.providers.AddProvider(ctx, c, p)
297 298
	}

Jeromy's avatar
Jeromy committed
299
	return nil, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
300
}
301 302 303 304

func convertToDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}