handlers.go 10.8 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6 7
	"errors"
	"fmt"
8
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9

10 11 12 13
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	pstore "github.com/libp2p/go-libp2p-peerstore"

14
	"github.com/gogo/protobuf/proto"
George Antoniadis's avatar
George Antoniadis committed
15
	ds "github.com/ipfs/go-datastore"
16
	u "github.com/ipfs/go-ipfs-util"
17
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
George Antoniadis's avatar
George Antoniadis committed
18
	recpb "github.com/libp2p/go-libp2p-record/pb"
Steven Allen's avatar
Steven Allen committed
19
	"github.com/multiformats/go-base32"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20 21 22
)

// dhthandler specifies the signature of functions that handle DHT messages.
23
type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24

25
func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26
	switch t {
27
	case pb.Message_FIND_NODE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28
		return dht.handleFindPeer
29
	case pb.Message_PING:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30 31
		return dht.handlePing
	}
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51

	if dht.enableValues {
		switch t {
		case pb.Message_GET_VALUE:
			return dht.handleGetValue
		case pb.Message_PUT_VALUE:
			return dht.handlePutValue
		}
	}

	if dht.enableProviders {
		switch t {
		case pb.Message_ADD_PROVIDER:
			return dht.handleAddProvider
		case pb.Message_GET_PROVIDERS:
			return dht.handleGetProviders
		}
	}

	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52 53
}

ForrestWeston's avatar
ForrestWeston committed
54
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55
	// setup response
56
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57

58
	// first, is there even a key?
59
	k := pmes.GetKey()
60
	if len(k) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61
		return nil, errors.New("handleGetValue but no key was provided")
62
		// TODO: send back an error response? could be bad, but the other node's hanging.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63 64
	}

65 66
	rec, err := dht.checkLocalDatastore(k)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67 68
		return nil, err
	}
69
	resp.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70 71

	// Find closest peer on given cluster to desired key and reply with that info
72
	closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
Jeromy's avatar
Jeromy committed
73
	if len(closer) > 0 {
74
		// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
Jeromy's avatar
Jeromy committed
75
		closerinfos := pstore.PeerInfos(dht.peerstore, closer)
76
		for _, pi := range closerinfos {
Matt Joiner's avatar
Matt Joiner committed
77
			logger.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
78
			if len(pi.Addrs) < 1 {
Steven Allen's avatar
Steven Allen committed
79 80 81 82 83
				logger.Warnw("no addresses on peer being sent",
					"local", dht.self,
					"to", p,
					"sending", pi.ID,
				)
Jeromy's avatar
Jeromy committed
84
			}
85
		}
86

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88 89 90 91 92
	}

	return resp, nil
}

93
func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) {
Matt Joiner's avatar
Matt Joiner committed
94
	logger.Debugf("%s handleGetValue looking into ds", dht.self)
95
	dskey := convertToDsKey(k)
96
	buf, err := dht.datastore.Get(dskey)
Matt Joiner's avatar
Matt Joiner committed
97
	logger.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, buf)
98 99 100 101 102 103 104 105 106 107 108

	if err == ds.ErrNotFound {
		return nil, nil
	}

	// if we got an unexpected error, bail.
	if err != nil {
		return nil, err
	}

	// if we have the value, send it back
Matt Joiner's avatar
Matt Joiner committed
109
	logger.Debugf("%s handleGetValue success!", dht.self)
110

George Antoniadis's avatar
George Antoniadis committed
111
	rec := new(recpb.Record)
112
	err = proto.Unmarshal(buf, rec)
113
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
114
		logger.Debug("failed to unmarshal DHT record from datastore")
115 116 117 118 119 120
		return nil, err
	}

	var recordIsBad bool
	recvtime, err := u.ParseRFC3339(rec.GetTimeReceived())
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
121
		logger.Info("either no receive time set on record, or it was invalid: ", err)
122 123 124
		recordIsBad = true
	}

Aarsh Shah's avatar
Aarsh Shah committed
125
	if time.Since(recvtime) > dht.maxRecordAge {
Matt Joiner's avatar
Matt Joiner committed
126
		logger.Debug("old record found, tossing.")
127 128 129
		recordIsBad = true
	}

130
	// NOTE: We do not verify the record here beyond checking these timestamps.
131 132 133 134 135 136
	// we put the burden of checking the records on the requester as checking a record
	// may be computationally expensive

	if recordIsBad {
		err := dht.datastore.Delete(dskey)
		if err != nil {
Matt Joiner's avatar
Matt Joiner committed
137
			logger.Error("Failed to delete bad record from datastore: ", err)
138 139 140 141 142 143 144 145
		}

		return nil, nil // can treat this as not having the record at all
	}

	return rec, nil
}

146 147
// Cleans the record (to avoid storing arbitrary data).
func cleanRecord(rec *recpb.Record) {
148
	rec.TimeReceived = ""
149 150
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151
// Store a value in this peer local storage
ForrestWeston's avatar
ForrestWeston committed
152
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) {
153 154
	rec := pmes.GetRecord()
	if rec == nil {
Steven Allen's avatar
Steven Allen committed
155
		logger.Debugw("got nil record from", "from", p)
156 157
		return nil, errors.New("nil record")
	}
158

159
	if !bytes.Equal(pmes.GetKey(), rec.GetKey()) {
160 161 162
		return nil, errors.New("put key doesn't match record key")
	}

163
	cleanRecord(rec)
164

165
	// Make sure the record is valid (not expired, valid signature etc)
166
	if err = dht.Validator.Validate(string(rec.GetKey()), rec.GetValue()); err != nil {
Steven Allen's avatar
Steven Allen committed
167
		logger.Infow("bad dht record in PUT", "from", p, "key", rec.GetKey(), "error", err)
168 169 170
		return nil, err
	}

171 172
	dskey := convertToDsKey(rec.GetKey())

173 174 175 176 177 178 179 180 181 182 183
	// fetch the striped lock for this key
	var indexForLock byte
	if len(rec.GetKey()) == 0 {
		indexForLock = 0
	} else {
		indexForLock = rec.GetKey()[len(rec.GetKey())-1]
	}
	lk := &dht.stripedPutLocks[indexForLock]
	lk.Lock()
	defer lk.Unlock()

184 185 186 187 188 189 190 191 192 193
	// Make sure the new record is "better" than the record we have locally.
	// This prevents a record with for example a lower sequence number from
	// overwriting a record with a higher sequence number.
	existing, err := dht.getRecordFromDatastore(dskey)
	if err != nil {
		return nil, err
	}

	if existing != nil {
		recs := [][]byte{rec.GetValue(), existing.GetValue()}
194
		i, err := dht.Validator.Select(string(rec.GetKey()), recs)
195
		if err != nil {
Steven Allen's avatar
Steven Allen committed
196
			logger.Warnw("dht record passed validation but failed select", "from", p, "key", rec.GetKey(), "error", err)
197 198 199
			return nil, err
		}
		if i != 0 {
Steven Allen's avatar
Steven Allen committed
200
			logger.Infow("DHT record in PUT older than existing record (ignoring)", "peer", p, "key", rec.GetKey())
201 202 203 204
			return nil, errors.New("old record")
		}
	}

205
	// record the time we receive every record
206
	rec.TimeReceived = u.FormatRFC3339(time.Now())
207 208

	data, err := proto.Marshal(rec)
209 210 211 212 213
	if err != nil {
		return nil, err
	}

	err = dht.datastore.Put(dskey, data)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214
	return pmes, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
215 216
}

Steven Allen's avatar
Steven Allen committed
217 218
// returns nil, nil when either nothing is found or the value found doesn't properly validate.
// returns nil, some_error when there's a *datastore* error (i.e., something goes very wrong)
219
func (dht *IpfsDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error) {
220
	buf, err := dht.datastore.Get(dskey)
221 222 223 224
	if err == ds.ErrNotFound {
		return nil, nil
	}
	if err != nil {
Steven Allen's avatar
Steven Allen committed
225
		logger.Errorw("error retrieving record from datastore", "key", dskey, "error", err)
226 227 228
		return nil, err
	}
	rec := new(recpb.Record)
229
	err = proto.Unmarshal(buf, rec)
230 231
	if err != nil {
		// Bad data in datastore, log it but don't return an error, we'll just overwrite it
Steven Allen's avatar
Steven Allen committed
232
		logger.Errorw("failed to unmarshal record from datastore", "key", dskey, "error", err)
233 234 235
		return nil, nil
	}

236
	err = dht.Validator.Validate(string(rec.GetKey()), rec.GetValue())
237 238 239
	if err != nil {
		// Invalid record in datastore, probably expired but don't return an error,
		// we'll just overwrite it
Steven Allen's avatar
Steven Allen committed
240
		logger.Debugw("local record verify failed", "key", rec.GetKey(), "error", err)
241 242 243 244 245 246
		return nil, nil
	}

	return rec, nil
}

247
func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Matt Joiner's avatar
Matt Joiner committed
248
	logger.Debugf("%s Responding to ping from %s!\n", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
249
	return pmes, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
250 251
}

252
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, from peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) {
253
	resp := pb.NewMessage(pmes.GetType(), nil, pmes.GetClusterLevel())
254
	var closest []peer.ID
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
255 256

	// if looking for self... special case where we send it on CloserPeers.
257 258
	targetPid := peer.ID(pmes.GetKey())
	if targetPid == dht.self {
259
		closest = []peer.ID{dht.self}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
260
	} else {
261
		closest = dht.betterPeersToQuery(pmes, from, dht.bucketSize)
262 263

		// Never tell a peer about itself.
264 265 266
		if targetPid != from {
			// Add the target peer to the set of closest peers if
			// not already present in our routing table.
267
			//
268 269 270 271 272 273 274 275 276 277 278
			// Later, when we lookup known addresses for all peers
			// in this set, we'll prune this peer if we don't
			// _actually_ know where it is.
			found := false
			for _, p := range closest {
				if targetPid == p {
					found = true
					break
				}
			}
			if !found {
279 280 281
				closest = append(closest, targetPid)
			}
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
282 283 284 285 286 287
	}

	if closest == nil {
		return resp, nil
	}

288
	// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
Jeromy's avatar
Jeromy committed
289
	closestinfos := pstore.PeerInfos(dht.peerstore, closest)
Steven Allen's avatar
Steven Allen committed
290
	// possibly an over-allocation but this array is temporary anyways.
291
	withAddresses := make([]peer.AddrInfo, 0, len(closestinfos))
292 293 294
	for _, pi := range closestinfos {
		if len(pi.Addrs) > 0 {
			withAddresses = append(withAddresses, pi)
295
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
296 297
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
298
	resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), withAddresses)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
299 300 301
	return resp, nil
}

302
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) {
303
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
304 305 306
	key := pmes.GetKey()
	if len(key) > 80 {
		return nil, fmt.Errorf("handleGetProviders key size too large")
307
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
308 309

	// check if we have this value, to add ourselves as provider.
310
	has, err := dht.datastore.Has(convertToDsKey(key))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
311
	if err != nil && err != ds.ErrNotFound {
Steven Allen's avatar
Steven Allen committed
312 313 314
		// FIXME: This doesn't work reliably. If we want this check, we
		// need a _blockstore_.
		logger.Errorw("error checking datastore for block", "key", key, "error", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
315 316 317 318
		has = false
	}

	// setup providers
319
	providers := dht.ProviderManager.GetProviders(ctx, key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
320 321 322 323
	if has {
		providers = append(providers, dht.self)
	}

324
	if len(providers) > 0 {
325
		// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
Jeromy's avatar
Jeromy committed
326
		infos := pstore.PeerInfos(dht.peerstore, providers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
327
		resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
328 329 330
	}

	// Also send closer peers.
331
	closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
332
	if closer != nil {
333
		// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
Jeromy's avatar
Jeromy committed
334
		infos := pstore.PeerInfos(dht.peerstore, closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
335
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
336 337 338 339 340
	}

	return resp, nil
}

341
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) {
342 343 344
	key := pmes.GetKey()
	if len(key) > 80 {
		return nil, fmt.Errorf("handleAddProviders key size too large")
345
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
346

Steven Allen's avatar
Steven Allen committed
347
	logger.Debugf("adding provider", "from", p, "key", key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
348

349
	// add provider should use the address given in the message
350 351 352
	pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
	for _, pi := range pinfos {
		if pi.ID != p {
353 354
			// we should ignore this provider record! not from originator.
			// (we should sign them and check signature later...)
Steven Allen's avatar
Steven Allen committed
355
			logger.Debugw("received provider from wrong peer", "from", p, "peer", pi.ID)
356 357
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
358

359
		if len(pi.Addrs) < 1 {
Steven Allen's avatar
Steven Allen committed
360
			logger.Debugw("no valid addresses for provider", "from", p)
361 362
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
363

364
		if pi.ID != dht.self { // don't add own addrs.
365
			// add the received addresses to our peerstore.
366
			dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL)
367
		}
368
		dht.ProviderManager.AddProvider(ctx, key, p)
369 370
	}

Jeromy's avatar
Jeromy committed
371
	return nil, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
372
}
373

374 375
func convertToDsKey(s []byte) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString(s))
376
}