handlers.go 10.6 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6 7
	"errors"
	"fmt"
8
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9

10 11 12 13
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	pstore "github.com/libp2p/go-libp2p-peerstore"

14
	"github.com/gogo/protobuf/proto"
George Antoniadis's avatar
George Antoniadis committed
15
	ds "github.com/ipfs/go-datastore"
16
	u "github.com/ipfs/go-ipfs-util"
17
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
George Antoniadis's avatar
George Antoniadis committed
18
	recpb "github.com/libp2p/go-libp2p-record/pb"
Steven Allen's avatar
Steven Allen committed
19
	"github.com/multiformats/go-base32"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20 21 22
)

// dhthandler specifies the signature of functions that handle DHT messages.
23
type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24

25
func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26
	switch t {
27
	case pb.Message_FIND_NODE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28
		return dht.handleFindPeer
29
	case pb.Message_PING:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30 31
		return dht.handlePing
	}
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51

	if dht.enableValues {
		switch t {
		case pb.Message_GET_VALUE:
			return dht.handleGetValue
		case pb.Message_PUT_VALUE:
			return dht.handlePutValue
		}
	}

	if dht.enableProviders {
		switch t {
		case pb.Message_ADD_PROVIDER:
			return dht.handleAddProvider
		case pb.Message_GET_PROVIDERS:
			return dht.handleGetProviders
		}
	}

	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52 53
}

ForrestWeston's avatar
ForrestWeston committed
54
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) {
55
	// first, is there even a key?
56
	k := pmes.GetKey()
57
	if len(k) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58 59 60
		return nil, errors.New("handleGetValue but no key was provided")
	}

Steven Allen's avatar
Steven Allen committed
61 62 63
	// setup response
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())

64 65
	rec, err := dht.checkLocalDatastore(k)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66 67
		return nil, err
	}
68
	resp.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69 70

	// Find closest peer on given cluster to desired key and reply with that info
71
	closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
Jeromy's avatar
Jeromy committed
72
	if len(closer) > 0 {
73
		// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
Jeromy's avatar
Jeromy committed
74
		closerinfos := pstore.PeerInfos(dht.peerstore, closer)
75
		for _, pi := range closerinfos {
Matt Joiner's avatar
Matt Joiner committed
76
			logger.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
77
			if len(pi.Addrs) < 1 {
Steven Allen's avatar
Steven Allen committed
78 79 80 81 82
				logger.Warnw("no addresses on peer being sent",
					"local", dht.self,
					"to", p,
					"sending", pi.ID,
				)
Jeromy's avatar
Jeromy committed
83
			}
84
		}
85

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87 88 89 90 91
	}

	return resp, nil
}

92
func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) {
Matt Joiner's avatar
Matt Joiner committed
93
	logger.Debugf("%s handleGetValue looking into ds", dht.self)
94
	dskey := convertToDsKey(k)
95
	buf, err := dht.datastore.Get(dskey)
Matt Joiner's avatar
Matt Joiner committed
96
	logger.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, buf)
97 98 99 100 101 102 103 104 105 106 107

	if err == ds.ErrNotFound {
		return nil, nil
	}

	// if we got an unexpected error, bail.
	if err != nil {
		return nil, err
	}

	// if we have the value, send it back
Matt Joiner's avatar
Matt Joiner committed
108
	logger.Debugf("%s handleGetValue success!", dht.self)
109

George Antoniadis's avatar
George Antoniadis committed
110
	rec := new(recpb.Record)
111
	err = proto.Unmarshal(buf, rec)
112
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
113
		logger.Debug("failed to unmarshal DHT record from datastore")
114 115 116 117 118 119
		return nil, err
	}

	var recordIsBad bool
	recvtime, err := u.ParseRFC3339(rec.GetTimeReceived())
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
120
		logger.Info("either no receive time set on record, or it was invalid: ", err)
121 122 123
		recordIsBad = true
	}

Aarsh Shah's avatar
Aarsh Shah committed
124
	if time.Since(recvtime) > dht.maxRecordAge {
Matt Joiner's avatar
Matt Joiner committed
125
		logger.Debug("old record found, tossing.")
126 127 128
		recordIsBad = true
	}

129
	// NOTE: We do not verify the record here beyond checking these timestamps.
130 131 132 133 134 135
	// we put the burden of checking the records on the requester as checking a record
	// may be computationally expensive

	if recordIsBad {
		err := dht.datastore.Delete(dskey)
		if err != nil {
Matt Joiner's avatar
Matt Joiner committed
136
			logger.Error("Failed to delete bad record from datastore: ", err)
137 138 139 140 141 142 143 144
		}

		return nil, nil // can treat this as not having the record at all
	}

	return rec, nil
}

145 146
// Cleans the record (to avoid storing arbitrary data).
func cleanRecord(rec *recpb.Record) {
147
	rec.TimeReceived = ""
148 149
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150
// Store a value in this peer local storage
ForrestWeston's avatar
ForrestWeston committed
151
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) {
Steven Allen's avatar
Steven Allen committed
152 153 154 155
	if len(pmes.GetKey()) == 0 {
		return nil, errors.New("handleGetValue but no key was provided")
	}

156 157
	rec := pmes.GetRecord()
	if rec == nil {
Steven Allen's avatar
Steven Allen committed
158
		logger.Debugw("got nil record from", "from", p)
159 160
		return nil, errors.New("nil record")
	}
161

162
	if !bytes.Equal(pmes.GetKey(), rec.GetKey()) {
163 164 165
		return nil, errors.New("put key doesn't match record key")
	}

166
	cleanRecord(rec)
167

168
	// Make sure the record is valid (not expired, valid signature etc)
169
	if err = dht.Validator.Validate(string(rec.GetKey()), rec.GetValue()); err != nil {
Steven Allen's avatar
Steven Allen committed
170
		logger.Infow("bad dht record in PUT", "from", p, "key", rec.GetKey(), "error", err)
171 172 173
		return nil, err
	}

174 175
	dskey := convertToDsKey(rec.GetKey())

176 177 178 179 180 181 182 183 184 185 186
	// fetch the striped lock for this key
	var indexForLock byte
	if len(rec.GetKey()) == 0 {
		indexForLock = 0
	} else {
		indexForLock = rec.GetKey()[len(rec.GetKey())-1]
	}
	lk := &dht.stripedPutLocks[indexForLock]
	lk.Lock()
	defer lk.Unlock()

187 188 189 190 191 192 193 194 195 196
	// Make sure the new record is "better" than the record we have locally.
	// This prevents a record with for example a lower sequence number from
	// overwriting a record with a higher sequence number.
	existing, err := dht.getRecordFromDatastore(dskey)
	if err != nil {
		return nil, err
	}

	if existing != nil {
		recs := [][]byte{rec.GetValue(), existing.GetValue()}
197
		i, err := dht.Validator.Select(string(rec.GetKey()), recs)
198
		if err != nil {
Steven Allen's avatar
Steven Allen committed
199
			logger.Warnw("dht record passed validation but failed select", "from", p, "key", rec.GetKey(), "error", err)
200 201 202
			return nil, err
		}
		if i != 0 {
Steven Allen's avatar
Steven Allen committed
203
			logger.Infow("DHT record in PUT older than existing record (ignoring)", "peer", p, "key", rec.GetKey())
204 205 206 207
			return nil, errors.New("old record")
		}
	}

208
	// record the time we receive every record
209
	rec.TimeReceived = u.FormatRFC3339(time.Now())
210 211

	data, err := proto.Marshal(rec)
212 213 214 215 216
	if err != nil {
		return nil, err
	}

	err = dht.datastore.Put(dskey, data)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
217
	return pmes, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
218 219
}

Steven Allen's avatar
Steven Allen committed
220 221
// returns nil, nil when either nothing is found or the value found doesn't properly validate.
// returns nil, some_error when there's a *datastore* error (i.e., something goes very wrong)
222
func (dht *IpfsDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error) {
223
	buf, err := dht.datastore.Get(dskey)
224 225 226 227
	if err == ds.ErrNotFound {
		return nil, nil
	}
	if err != nil {
Steven Allen's avatar
Steven Allen committed
228
		logger.Errorw("error retrieving record from datastore", "key", dskey, "error", err)
229 230 231
		return nil, err
	}
	rec := new(recpb.Record)
232
	err = proto.Unmarshal(buf, rec)
233 234
	if err != nil {
		// Bad data in datastore, log it but don't return an error, we'll just overwrite it
Steven Allen's avatar
Steven Allen committed
235
		logger.Errorw("failed to unmarshal record from datastore", "key", dskey, "error", err)
236 237 238
		return nil, nil
	}

239
	err = dht.Validator.Validate(string(rec.GetKey()), rec.GetValue())
240 241 242
	if err != nil {
		// Invalid record in datastore, probably expired but don't return an error,
		// we'll just overwrite it
Steven Allen's avatar
Steven Allen committed
243
		logger.Debugw("local record verify failed", "key", rec.GetKey(), "error", err)
244 245 246 247 248 249
		return nil, nil
	}

	return rec, nil
}

250
func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Matt Joiner's avatar
Matt Joiner committed
251
	logger.Debugf("%s Responding to ping from %s!\n", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252
	return pmes, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
253 254
}

255
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, from peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) {
256
	resp := pb.NewMessage(pmes.GetType(), nil, pmes.GetClusterLevel())
257
	var closest []peer.ID
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
258

Steven Allen's avatar
Steven Allen committed
259 260 261 262
	if len(pmes.GetKey()) == 0 {
		return nil, fmt.Errorf("handleFindPeer with empty key")
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
263
	// if looking for self... special case where we send it on CloserPeers.
264 265
	targetPid := peer.ID(pmes.GetKey())
	if targetPid == dht.self {
266
		closest = []peer.ID{dht.self}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
267
	} else {
268
		closest = dht.betterPeersToQuery(pmes, from, dht.bucketSize)
269 270

		// Never tell a peer about itself.
271 272 273
		if targetPid != from {
			// Add the target peer to the set of closest peers if
			// not already present in our routing table.
274
			//
275 276 277 278 279 280 281 282 283 284 285
			// Later, when we lookup known addresses for all peers
			// in this set, we'll prune this peer if we don't
			// _actually_ know where it is.
			found := false
			for _, p := range closest {
				if targetPid == p {
					found = true
					break
				}
			}
			if !found {
286 287 288
				closest = append(closest, targetPid)
			}
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
289 290 291 292 293 294
	}

	if closest == nil {
		return resp, nil
	}

295
	// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
Jeromy's avatar
Jeromy committed
296
	closestinfos := pstore.PeerInfos(dht.peerstore, closest)
Steven Allen's avatar
Steven Allen committed
297
	// possibly an over-allocation but this array is temporary anyways.
298
	withAddresses := make([]peer.AddrInfo, 0, len(closestinfos))
299 300 301
	for _, pi := range closestinfos {
		if len(pi.Addrs) > 0 {
			withAddresses = append(withAddresses, pi)
302
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
303 304
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
305
	resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), withAddresses)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
306 307 308
	return resp, nil
}

309
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) {
310 311 312
	key := pmes.GetKey()
	if len(key) > 80 {
		return nil, fmt.Errorf("handleGetProviders key size too large")
Steven Allen's avatar
Steven Allen committed
313 314
	} else if len(key) == 0 {
		return nil, fmt.Errorf("handleGetProviders key is empty")
315
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
316

Steven Allen's avatar
Steven Allen committed
317 318
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
319
	// setup providers
320
	providers := dht.ProviderManager.GetProviders(ctx, key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
321

322
	if len(providers) > 0 {
323
		// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
Jeromy's avatar
Jeromy committed
324
		infos := pstore.PeerInfos(dht.peerstore, providers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
325
		resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
326 327 328
	}

	// Also send closer peers.
329
	closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
330
	if closer != nil {
331
		// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
Jeromy's avatar
Jeromy committed
332
		infos := pstore.PeerInfos(dht.peerstore, closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
333
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
334 335 336 337 338
	}

	return resp, nil
}

339
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) {
340 341
	key := pmes.GetKey()
	if len(key) > 80 {
Steven Allen's avatar
Steven Allen committed
342 343 344
		return nil, fmt.Errorf("handleAddProvider key size too large")
	} else if len(key) == 0 {
		return nil, fmt.Errorf("handleAddProvider key is empty")
345
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
346

Steven Allen's avatar
Steven Allen committed
347
	logger.Debugf("adding provider", "from", p, "key", key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
348

349
	// add provider should use the address given in the message
350 351 352
	pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
	for _, pi := range pinfos {
		if pi.ID != p {
353 354
			// we should ignore this provider record! not from originator.
			// (we should sign them and check signature later...)
Steven Allen's avatar
Steven Allen committed
355
			logger.Debugw("received provider from wrong peer", "from", p, "peer", pi.ID)
356 357
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
358

359
		if len(pi.Addrs) < 1 {
Steven Allen's avatar
Steven Allen committed
360
			logger.Debugw("no valid addresses for provider", "from", p)
361 362
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
363

364
		if pi.ID != dht.self { // don't add own addrs.
365
			// add the received addresses to our peerstore.
366
			dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL)
367
		}
368
		dht.ProviderManager.AddProvider(ctx, key, p)
369 370
	}

Jeromy's avatar
Jeromy committed
371
	return nil, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
372
}
373

374 375
func convertToDsKey(s []byte) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString(s))
376
}