handlers.go 11 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6
	"errors"
	"fmt"
7
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8

9
	proto "github.com/gogo/protobuf/proto"
10
	cid "github.com/ipfs/go-cid"
George Antoniadis's avatar
George Antoniadis committed
11
	ds "github.com/ipfs/go-datastore"
12
	u "github.com/ipfs/go-ipfs-util"
George Antoniadis's avatar
George Antoniadis committed
13
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
14 15 16
	lgbl "github.com/libp2p/go-libp2p-loggables"
	peer "github.com/libp2p/go-libp2p-peer"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
17
	recpb "github.com/libp2p/go-libp2p-record/pb"
18
	base32 "github.com/whyrusleeping/base32"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20
)

Jeromy's avatar
Jeromy committed
21
// The number of closer peers to send on requests.
Jeromy's avatar
Jeromy committed
22
var CloserPeerCount = KValue
23

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24
// dhthandler specifies the signature of functions that handle DHT messages.
25
type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26

27
func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28
	switch t {
29
	case pb.Message_GET_VALUE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30
		return dht.handleGetValue
31
	case pb.Message_PUT_VALUE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32
		return dht.handlePutValue
33
	case pb.Message_FIND_NODE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34
		return dht.handleFindPeer
35
	case pb.Message_ADD_PROVIDER:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36
		return dht.handleAddProvider
37
	case pb.Message_GET_PROVIDERS:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38
		return dht.handleGetProviders
39
	case pb.Message_PING:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40 41 42 43 44 45
		return dht.handlePing
	default:
		return nil
	}
}

ForrestWeston's avatar
ForrestWeston committed
46 47 48 49 50 51 52 53
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) {
	eip := log.EventBegin(ctx, "handleGetValue", p)
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
54
	log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55 56

	// setup response
57
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58

59
	// first, is there even a key?
60
	k := pmes.GetKey()
61
	if k == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62
		return nil, errors.New("handleGetValue but no key was provided")
63
		// TODO: send back an error response? could be bad, but the other node's hanging.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64 65
	}

66 67
	rec, err := dht.checkLocalDatastore(k)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68 69
		return nil, err
	}
70
	resp.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71 72

	// Find closest peer on given cluster to desired key and reply with that info
73
	closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Jeromy's avatar
Jeromy committed
74
	if len(closer) > 0 {
Jeromy's avatar
Jeromy committed
75
		closerinfos := pstore.PeerInfos(dht.peerstore, closer)
76 77 78
		for _, pi := range closerinfos {
			log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
			if len(pi.Addrs) < 1 {
79
				log.Warningf(`no addresses on peer being sent!
80 81 82
					[local:%s]
					[sending:%s]
					[remote:%s]`, dht.self, pi.ID, p)
Jeromy's avatar
Jeromy committed
83
			}
84
		}
85

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87 88 89 90 91
	}

	return resp, nil
}

92
func (dht *IpfsDHT) checkLocalDatastore(k string) (*recpb.Record, error) {
93
	log.Debugf("%s handleGetValue looking into ds", dht.self)
94
	dskey := convertToDsKey(k)
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
	iVal, err := dht.datastore.Get(dskey)
	log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)

	if err == ds.ErrNotFound {
		return nil, nil
	}

	// if we got an unexpected error, bail.
	if err != nil {
		return nil, err
	}

	// if we have the value, send it back
	log.Debugf("%s handleGetValue success!", dht.self)

	byts, ok := iVal.([]byte)
	if !ok {
		return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
	}

George Antoniadis's avatar
George Antoniadis committed
115
	rec := new(recpb.Record)
116 117
	err = proto.Unmarshal(byts, rec)
	if err != nil {
118
		log.Debug("failed to unmarshal DHT record from datastore")
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
		return nil, err
	}

	var recordIsBad bool
	recvtime, err := u.ParseRFC3339(rec.GetTimeReceived())
	if err != nil {
		log.Info("either no receive time set on record, or it was invalid: ", err)
		recordIsBad = true
	}

	if time.Now().Sub(recvtime) > MaxRecordAge {
		log.Debug("old record found, tossing.")
		recordIsBad = true
	}

134
	// NOTE: We do not verify the record here beyond checking these timestamps.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
	// we put the burden of checking the records on the requester as checking a record
	// may be computationally expensive

	if recordIsBad {
		err := dht.datastore.Delete(dskey)
		if err != nil {
			log.Error("Failed to delete bad record from datastore: ", err)
		}

		return nil, nil // can treat this as not having the record at all
	}

	return rec, nil
}

150 151 152 153 154 155
// Cleans the record (to avoid storing arbitrary data).
func cleanRecord(rec *recpb.Record) {
	rec.XXX_unrecognized = nil
	rec.TimeReceived = nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156
// Store a value in this peer local storage
ForrestWeston's avatar
ForrestWeston committed
157 158 159 160 161 162 163 164 165
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) {
	eip := log.EventBegin(ctx, "handlePutValue", p)
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()

166
	dskey := convertToDsKey(pmes.GetKey())
167

168 169 170 171 172
	rec := pmes.GetRecord()
	if rec == nil {
		log.Infof("Got nil record from: %s", p.Pretty())
		return nil, errors.New("nil record")
	}
173
	cleanRecord(rec)
174

175
	// Make sure the record is valid (not expired, valid signature etc)
176 177
	if err = dht.Validator.VerifyRecord(rec); err != nil {
		log.Warningf("Bad dht record in PUT from: %s. %s", p.Pretty(), err)
178 179 180
		return nil, err
	}

181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
	// Make sure the new record is "better" than the record we have locally.
	// This prevents a record with for example a lower sequence number from
	// overwriting a record with a higher sequence number.
	existing, err := dht.getRecordFromDatastore(dskey)
	if err != nil {
		return nil, err
	}

	if existing != nil {
		recs := [][]byte{rec.GetValue(), existing.GetValue()}
		i, err := dht.Selector.BestRecord(pmes.GetKey(), recs)
		if err != nil {
			log.Warningf("Bad dht record in PUT from %s: %s", p.Pretty(), err)
			return nil, err
		}
		if i != 0 {
			log.Infof("DHT record in PUT from %s is older than existing record. Ignoring", p.Pretty())
			return nil, errors.New("old record")
		}
	}

202 203 204 205
	// record the time we receive every record
	rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))

	data, err := proto.Marshal(rec)
206 207 208 209 210
	if err != nil {
		return nil, err
	}

	err = dht.datastore.Put(dskey, data)
Jeromy's avatar
Jeromy committed
211
	log.Debugf("%s handlePutValue %v", dht.self, dskey)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212
	return pmes, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213 214
}

215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
func (dht *IpfsDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error) {
	reci, err := dht.datastore.Get(dskey)
	if err == ds.ErrNotFound {
		return nil, nil
	}
	if err != nil {
		log.Warningf("Got error retrieving record with key %s from datastore: %s", dskey, err)
		return nil, err
	}

	byt, ok := reci.([]byte)
	if !ok {
		// Bad data in datastore, log it but don't return an error, we'll just overwrite it
		log.Warningf("Value stored in datastore with key %s is not []byte", dskey)
		return nil, nil
	}

	rec := new(recpb.Record)
	err = proto.Unmarshal(byt, rec)
	if err != nil {
		// Bad data in datastore, log it but don't return an error, we'll just overwrite it
		log.Warningf("Bad record data stored in datastore with key %s: could not unmarshal record", dskey)
		return nil, nil
	}

	err = dht.Validator.VerifyRecord(rec)
	if err != nil {
		// Invalid record in datastore, probably expired but don't return an error,
		// we'll just overwrite it
		log.Debugf("Local record verify failed: %s (discarded)", err)
		return nil, nil
	}

	return rec, nil
}

251
func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252
	log.Debugf("%s Responding to ping from %s!\n", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
253
	return pmes, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
254 255
}

256
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
257
	defer log.EventBegin(ctx, "handleFindPeer", p).Done()
258
	resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
259
	var closest []peer.ID
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
260 261

	// if looking for self... special case where we send it on CloserPeers.
262 263
	if peer.ID(pmes.GetKey()) == dht.self {
		closest = []peer.ID{dht.self}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
264
	} else {
265
		closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
266 267 268
	}

	if closest == nil {
269
		log.Infof("%s handleFindPeer %s: could not find anything.", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
270 271 272
		return resp, nil
	}

Jeromy's avatar
Jeromy committed
273
	closestinfos := pstore.PeerInfos(dht.peerstore, closest)
Steven Allen's avatar
Steven Allen committed
274 275
	// possibly an over-allocation but this array is temporary anyways.
	withAddresses := make([]pstore.PeerInfo, 0, len(closestinfos))
276 277 278 279
	for _, pi := range closestinfos {
		if len(pi.Addrs) > 0 {
			withAddresses = append(withAddresses, pi)
			log.Debugf("handleFindPeer: sending back '%s'", pi.ID)
280
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
281 282
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
283
	resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), withAddresses)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
284 285 286
	return resp, nil
}

287
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
288 289
	lm := make(lgbl.DeferredMap)
	lm["peer"] = func() interface{} { return p.Pretty() }
ForrestWeston's avatar
ForrestWeston committed
290 291
	eip := log.EventBegin(ctx, "handleGetProviders", lm)
	defer eip.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
292

293
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
294 295
	c, err := cid.Cast([]byte(pmes.GetKey()))
	if err != nil {
ForrestWeston's avatar
ForrestWeston committed
296
		eip.SetError(err)
297 298 299 300
		return nil, err
	}

	lm["key"] = func() interface{} { return c.String() }
301 302

	// debug logging niceness.
303
	reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, c)
304 305
	log.Debugf("%s begin", reqDesc)
	defer log.Debugf("%s end", reqDesc)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
306 307

	// check if we have this value, to add ourselves as provider.
308
	has, err := dht.datastore.Has(convertToDsKey(c.KeyString()))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
309
	if err != nil && err != ds.ErrNotFound {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
310
		log.Debugf("unexpected datastore error: %v\n", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
311 312 313 314
		has = false
	}

	// setup providers
315
	providers := dht.providers.GetProviders(ctx, c)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
316 317
	if has {
		providers = append(providers, dht.self)
318
		log.Debugf("%s have the value. added self as provider", reqDesc)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
319 320 321
	}

	if providers != nil && len(providers) > 0 {
Jeromy's avatar
Jeromy committed
322
		infos := pstore.PeerInfos(dht.peerstore, providers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
323
		resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
324
		log.Debugf("%s have %d providers: %s", reqDesc, len(providers), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
325 326 327
	}

	// Also send closer peers.
328
	closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
329
	if closer != nil {
Jeromy's avatar
Jeromy committed
330
		infos := pstore.PeerInfos(dht.peerstore, closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
331
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
332
		log.Debugf("%s have %d closer peers: %s", reqDesc, len(closer), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
333 334 335 336 337
	}

	return resp, nil
}

338
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
339 340
	lm := make(lgbl.DeferredMap)
	lm["peer"] = func() interface{} { return p.Pretty() }
ForrestWeston's avatar
ForrestWeston committed
341 342
	eip := log.EventBegin(ctx, "handleAddProvider", lm)
	defer eip.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
343

344 345
	c, err := cid.Cast([]byte(pmes.GetKey()))
	if err != nil {
ForrestWeston's avatar
ForrestWeston committed
346
		eip.SetError(err)
347 348 349 350
		return nil, err
	}

	lm["key"] = func() interface{} { return c.String() }
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
351

352
	log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, c)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
353

354
	// add provider should use the address given in the message
355 356 357 358 359
	pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
	for _, pi := range pinfos {
		if pi.ID != p {
			// we should ignore this provider reccord! not from originator.
			// (we chould sign them and check signature later...)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
360
			log.Debugf("handleAddProvider received provider %s from %s. Ignore.", pi.ID, p)
361 362
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
363

364
		if len(pi.Addrs) < 1 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
365
			log.Debugf("%s got no valid addresses for provider %s. Ignore.", dht.self, p)
366 367
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
368

369
		log.Infof("received provider %s for %s (addrs: %s)", p, c, pi.Addrs)
370
		if pi.ID != dht.self { // dont add own addrs.
371
			// add the received addresses to our peerstore.
Jeromy's avatar
Jeromy committed
372
			dht.peerstore.AddAddrs(pi.ID, pi.Addrs, pstore.ProviderAddrTTL)
373
		}
374
		dht.providers.AddProvider(ctx, c, p)
375 376
	}

Jeromy's avatar
Jeromy committed
377
	return nil, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
378
}
379 380 381 382

func convertToDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}