handlers.go 9.49 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6
	"errors"
	"fmt"
7
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8

9
	proto "github.com/gogo/protobuf/proto"
10
	cid "github.com/ipfs/go-cid"
George Antoniadis's avatar
George Antoniadis committed
11
	ds "github.com/ipfs/go-datastore"
12
	u "github.com/ipfs/go-ipfs-util"
George Antoniadis's avatar
George Antoniadis committed
13
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
14 15 16
	lgbl "github.com/libp2p/go-libp2p-loggables"
	peer "github.com/libp2p/go-libp2p-peer"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
17
	recpb "github.com/libp2p/go-libp2p-record/pb"
18
	base32 "github.com/whyrusleeping/base32"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20
)

Jeromy's avatar
Jeromy committed
21
// The number of closer peers to send on requests.
Jeromy's avatar
Jeromy committed
22
var CloserPeerCount = KValue
23

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24
// dhthandler specifies the signature of functions that handle DHT messages.
25
type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26

27
func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28
	switch t {
29
	case pb.Message_GET_VALUE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30
		return dht.handleGetValue
31
	case pb.Message_PUT_VALUE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32
		return dht.handlePutValue
33
	case pb.Message_FIND_NODE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34
		return dht.handleFindPeer
35
	case pb.Message_ADD_PROVIDER:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36
		return dht.handleAddProvider
37
	case pb.Message_GET_PROVIDERS:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38
		return dht.handleGetProviders
39
	case pb.Message_PING:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40 41 42 43 44 45
		return dht.handlePing
	default:
		return nil
	}
}

ForrestWeston's avatar
ForrestWeston committed
46 47 48 49 50 51 52 53
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) {
	eip := log.EventBegin(ctx, "handleGetValue", p)
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
54
	log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55 56

	// setup response
57
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58

59
	// first, is there even a key?
60
	k := pmes.GetKey()
61
	if k == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62
		return nil, errors.New("handleGetValue but no key was provided")
63
		// TODO: send back an error response? could be bad, but the other node's hanging.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64 65
	}

66 67
	rec, err := dht.checkLocalDatastore(k)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68 69
		return nil, err
	}
70
	resp.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71 72

	// Find closest peer on given cluster to desired key and reply with that info
73
	closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Jeromy's avatar
Jeromy committed
74
	if len(closer) > 0 {
Jeromy's avatar
Jeromy committed
75
		closerinfos := pstore.PeerInfos(dht.peerstore, closer)
76 77 78
		for _, pi := range closerinfos {
			log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
			if len(pi.Addrs) < 1 {
79
				log.Warningf(`no addresses on peer being sent!
80 81 82
					[local:%s]
					[sending:%s]
					[remote:%s]`, dht.self, pi.ID, p)
Jeromy's avatar
Jeromy committed
83
			}
84
		}
85

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87 88 89 90 91
	}

	return resp, nil
}

92
func (dht *IpfsDHT) checkLocalDatastore(k string) (*recpb.Record, error) {
93
	log.Debugf("%s handleGetValue looking into ds", dht.self)
94
	dskey := convertToDsKey(k)
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
	iVal, err := dht.datastore.Get(dskey)
	log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)

	if err == ds.ErrNotFound {
		return nil, nil
	}

	// if we got an unexpected error, bail.
	if err != nil {
		return nil, err
	}

	// if we have the value, send it back
	log.Debugf("%s handleGetValue success!", dht.self)

	byts, ok := iVal.([]byte)
	if !ok {
		return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
	}

George Antoniadis's avatar
George Antoniadis committed
115
	rec := new(recpb.Record)
116 117
	err = proto.Unmarshal(byts, rec)
	if err != nil {
118
		log.Debug("failed to unmarshal DHT record from datastore")
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
		return nil, err
	}

	// if its our record, dont bother checking the times on it
	if peer.ID(rec.GetAuthor()) == dht.self {
		return rec, nil
	}

	var recordIsBad bool
	recvtime, err := u.ParseRFC3339(rec.GetTimeReceived())
	if err != nil {
		log.Info("either no receive time set on record, or it was invalid: ", err)
		recordIsBad = true
	}

	if time.Now().Sub(recvtime) > MaxRecordAge {
		log.Debug("old record found, tossing.")
		recordIsBad = true
	}

139
	// NOTE: We do not verify the record here beyond checking these timestamps.
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
	// we put the burden of checking the records on the requester as checking a record
	// may be computationally expensive

	if recordIsBad {
		err := dht.datastore.Delete(dskey)
		if err != nil {
			log.Error("Failed to delete bad record from datastore: ", err)
		}

		return nil, nil // can treat this as not having the record at all
	}

	return rec, nil
}

155 156 157 158 159 160 161 162 163 164 165 166
// Cleans the record (to avoid storing arbitrary data).
func cleanRecord(rec *recpb.Record) {
	rec.XXX_unrecognized = nil
	rec.TimeReceived = nil

	// Only include the author if there's a signature (otherwise, it's
	// unvalidated and could be anything).
	if len(rec.Signature) == 0 {
		rec.Author = nil
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167
// Store a value in this peer local storage
ForrestWeston's avatar
ForrestWeston committed
168 169 170 171 172 173 174 175 176
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) {
	eip := log.EventBegin(ctx, "handlePutValue", p)
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()

177
	dskey := convertToDsKey(pmes.GetKey())
178

179 180 181 182 183
	rec := pmes.GetRecord()
	if rec == nil {
		log.Infof("Got nil record from: %s", p.Pretty())
		return nil, errors.New("nil record")
	}
184
	cleanRecord(rec)
185

ForrestWeston's avatar
ForrestWeston committed
186
	if err = dht.verifyRecordLocally(rec); err != nil {
187
		log.Warningf("Bad dht record in PUT from: %s. %s", peer.ID(pmes.GetRecord().GetAuthor()), err)
188 189 190
		return nil, err
	}

191 192 193 194
	// record the time we receive every record
	rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))

	data, err := proto.Marshal(rec)
195 196 197 198 199
	if err != nil {
		return nil, err
	}

	err = dht.datastore.Put(dskey, data)
Jeromy's avatar
Jeromy committed
200
	log.Debugf("%s handlePutValue %v", dht.self, dskey)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
201
	return pmes, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202 203
}

204
func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
205
	log.Debugf("%s Responding to ping from %s!\n", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
206
	return pmes, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207 208
}

209
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
210
	defer log.EventBegin(ctx, "handleFindPeer", p).Done()
211
	resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
212
	var closest []peer.ID
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213 214

	// if looking for self... special case where we send it on CloserPeers.
215 216
	if peer.ID(pmes.GetKey()) == dht.self {
		closest = []peer.ID{dht.self}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
217
	} else {
218
		closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
219 220 221
	}

	if closest == nil {
222
		log.Infof("%s handleFindPeer %s: could not find anything.", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
223 224 225
		return resp, nil
	}

Jeromy's avatar
Jeromy committed
226
	closestinfos := pstore.PeerInfos(dht.peerstore, closest)
Steven Allen's avatar
Steven Allen committed
227 228
	// possibly an over-allocation but this array is temporary anyways.
	withAddresses := make([]pstore.PeerInfo, 0, len(closestinfos))
229 230 231 232
	for _, pi := range closestinfos {
		if len(pi.Addrs) > 0 {
			withAddresses = append(withAddresses, pi)
			log.Debugf("handleFindPeer: sending back '%s'", pi.ID)
233
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234 235
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
236
	resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), withAddresses)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
237 238 239
	return resp, nil
}

240
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
241 242
	lm := make(lgbl.DeferredMap)
	lm["peer"] = func() interface{} { return p.Pretty() }
ForrestWeston's avatar
ForrestWeston committed
243 244
	eip := log.EventBegin(ctx, "handleGetProviders", lm)
	defer eip.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
245

246
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
247 248
	c, err := cid.Cast([]byte(pmes.GetKey()))
	if err != nil {
ForrestWeston's avatar
ForrestWeston committed
249
		eip.SetError(err)
250 251 252 253
		return nil, err
	}

	lm["key"] = func() interface{} { return c.String() }
254 255

	// debug logging niceness.
256
	reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, c)
257 258
	log.Debugf("%s begin", reqDesc)
	defer log.Debugf("%s end", reqDesc)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
259 260

	// check if we have this value, to add ourselves as provider.
261
	has, err := dht.datastore.Has(convertToDsKey(c.KeyString()))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
262
	if err != nil && err != ds.ErrNotFound {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
263
		log.Debugf("unexpected datastore error: %v\n", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
264 265 266 267
		has = false
	}

	// setup providers
268
	providers := dht.providers.GetProviders(ctx, c)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
269 270
	if has {
		providers = append(providers, dht.self)
271
		log.Debugf("%s have the value. added self as provider", reqDesc)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
272 273 274
	}

	if providers != nil && len(providers) > 0 {
Jeromy's avatar
Jeromy committed
275
		infos := pstore.PeerInfos(dht.peerstore, providers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
276
		resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
277
		log.Debugf("%s have %d providers: %s", reqDesc, len(providers), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
278 279 280
	}

	// Also send closer peers.
281
	closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
282
	if closer != nil {
Jeromy's avatar
Jeromy committed
283
		infos := pstore.PeerInfos(dht.peerstore, closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
284
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
285
		log.Debugf("%s have %d closer peers: %s", reqDesc, len(closer), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
286 287 288 289 290
	}

	return resp, nil
}

291
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
292 293
	lm := make(lgbl.DeferredMap)
	lm["peer"] = func() interface{} { return p.Pretty() }
ForrestWeston's avatar
ForrestWeston committed
294 295
	eip := log.EventBegin(ctx, "handleAddProvider", lm)
	defer eip.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
296

297 298
	c, err := cid.Cast([]byte(pmes.GetKey()))
	if err != nil {
ForrestWeston's avatar
ForrestWeston committed
299
		eip.SetError(err)
300 301 302 303
		return nil, err
	}

	lm["key"] = func() interface{} { return c.String() }
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
304

305
	log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, c)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
306

307
	// add provider should use the address given in the message
308 309 310 311 312
	pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
	for _, pi := range pinfos {
		if pi.ID != p {
			// we should ignore this provider reccord! not from originator.
			// (we chould sign them and check signature later...)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
313
			log.Debugf("handleAddProvider received provider %s from %s. Ignore.", pi.ID, p)
314 315
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
316

317
		if len(pi.Addrs) < 1 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
318
			log.Debugf("%s got no valid addresses for provider %s. Ignore.", dht.self, p)
319 320
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
321

322
		log.Infof("received provider %s for %s (addrs: %s)", p, c, pi.Addrs)
323
		if pi.ID != dht.self { // dont add own addrs.
324
			// add the received addresses to our peerstore.
Jeromy's avatar
Jeromy committed
325
			dht.peerstore.AddAddrs(pi.ID, pi.Addrs, pstore.ProviderAddrTTL)
326
		}
327
		dht.providers.AddProvider(ctx, c, p)
328 329
	}

Jeromy's avatar
Jeromy committed
330
	return nil, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
331
}
332 333 334 335

func convertToDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}