handlers.go 8.75 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4 5
package dht

import (
	"errors"
	"fmt"
6
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7

8
	key "github.com/ipfs/go-ipfs/blocks/key"
9
	pb "github.com/ipfs/go-ipfs/routing/dht/pb"
10
	lgbl "github.com/ipfs/go-ipfs/thirdparty/loggables"
Jakub Sztandera's avatar
Jakub Sztandera committed
11
	ds "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore"
Jeromy's avatar
Jeromy committed
12

13 14
	pstore "gx/ipfs/QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P/go-libp2p-peerstore"
	peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
15 16
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
	u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
Jeromy's avatar
Jeromy committed
17
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
)

Jeromy's avatar
Jeromy committed
20
// The number of closer peers to send on requests.
Jeromy's avatar
Jeromy committed
21
var CloserPeerCount = KValue
22

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23
// dhthandler specifies the signature of functions that handle DHT messages.
24
type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25

26
func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27
	switch t {
28
	case pb.Message_GET_VALUE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29
		return dht.handleGetValue
30
	case pb.Message_PUT_VALUE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31
		return dht.handlePutValue
32
	case pb.Message_FIND_NODE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33
		return dht.handleFindPeer
34
	case pb.Message_ADD_PROVIDER:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35
		return dht.handleAddProvider
36
	case pb.Message_GET_PROVIDERS:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37
		return dht.handleGetProviders
38
	case pb.Message_PING:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39 40 41 42 43 44
		return dht.handlePing
	default:
		return nil
	}
}

45
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
46
	defer log.EventBegin(ctx, "handleGetValue", p).Done()
Jeromy's avatar
Jeromy committed
47
	log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
48 49

	// setup response
50
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
51

52
	// first, is there even a key?
53
	k := key.Key(pmes.GetKey())
54
	if k == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55
		return nil, errors.New("handleGetValue but no key was provided")
56
		// TODO: send back an error response? could be bad, but the other node's hanging.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57 58
	}

59 60
	rec, err := dht.checkLocalDatastore(k)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61 62
		return nil, err
	}
63
	resp.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64 65

	// Find closest peer on given cluster to desired key and reply with that info
66
	closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Jeromy's avatar
Jeromy committed
67
	if len(closer) > 0 {
Jeromy's avatar
Jeromy committed
68
		closerinfos := pstore.PeerInfos(dht.peerstore, closer)
69 70 71
		for _, pi := range closerinfos {
			log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
			if len(pi.Addrs) < 1 {
rht's avatar
rht committed
72
				log.Errorf(`no addresses on peer being sent!
73 74 75
					[local:%s]
					[sending:%s]
					[remote:%s]`, dht.self, pi.ID, p)
Jeromy's avatar
Jeromy committed
76
			}
77
		}
78

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80 81 82 83 84
	}

	return resp, nil
}

85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
func (dht *IpfsDHT) checkLocalDatastore(k key.Key) (*pb.Record, error) {
	log.Debugf("%s handleGetValue looking into ds", dht.self)
	dskey := k.DsKey()
	iVal, err := dht.datastore.Get(dskey)
	log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)

	if err == ds.ErrNotFound {
		return nil, nil
	}

	// if we got an unexpected error, bail.
	if err != nil {
		return nil, err
	}

	// if we have the value, send it back
	log.Debugf("%s handleGetValue success!", dht.self)

	byts, ok := iVal.([]byte)
	if !ok {
		return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
	}

	rec := new(pb.Record)
	err = proto.Unmarshal(byts, rec)
	if err != nil {
111
		log.Debug("failed to unmarshal DHT record from datastore")
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
		return nil, err
	}

	// if its our record, dont bother checking the times on it
	if peer.ID(rec.GetAuthor()) == dht.self {
		return rec, nil
	}

	var recordIsBad bool
	recvtime, err := u.ParseRFC3339(rec.GetTimeReceived())
	if err != nil {
		log.Info("either no receive time set on record, or it was invalid: ", err)
		recordIsBad = true
	}

	if time.Now().Sub(recvtime) > MaxRecordAge {
		log.Debug("old record found, tossing.")
		recordIsBad = true
	}

132
	// NOTE: We do not verify the record here beyond checking these timestamps.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
	// we put the burden of checking the records on the requester as checking a record
	// may be computationally expensive

	if recordIsBad {
		err := dht.datastore.Delete(dskey)
		if err != nil {
			log.Error("Failed to delete bad record from datastore: ", err)
		}

		return nil, nil // can treat this as not having the record at all
	}

	return rec, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148
// Store a value in this peer local storage
149
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
150
	defer log.EventBegin(ctx, "handlePutValue", p).Done()
151
	dskey := key.Key(pmes.GetKey()).DsKey()
152

153
	if err := dht.verifyRecordLocally(pmes.GetRecord()); err != nil {
154
		log.Warningf("Bad dht record in PUT from: %s. %s", key.Key(pmes.GetRecord().GetAuthor()), err)
155 156 157
		return nil, err
	}

158 159 160 161 162 163
	rec := pmes.GetRecord()

	// record the time we receive every record
	rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))

	data, err := proto.Marshal(rec)
164 165 166 167 168
	if err != nil {
		return nil, err
	}

	err = dht.datastore.Put(dskey, data)
Jeromy's avatar
Jeromy committed
169
	log.Debugf("%s handlePutValue %v", dht.self, dskey)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170
	return pmes, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
171 172
}

173
func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174
	log.Debugf("%s Responding to ping from %s!\n", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
175
	return pmes, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
176 177
}

178
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
179
	defer log.EventBegin(ctx, "handleFindPeer", p).Done()
180
	resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
181
	var closest []peer.ID
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
182 183

	// if looking for self... special case where we send it on CloserPeers.
184 185
	if peer.ID(pmes.GetKey()) == dht.self {
		closest = []peer.ID{dht.self}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186
	} else {
187
		closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
188 189 190
	}

	if closest == nil {
191
		log.Infof("%s handleFindPeer %s: could not find anything.", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192 193 194
		return resp, nil
	}

Jeromy's avatar
Jeromy committed
195 196
	var withAddresses []pstore.PeerInfo
	closestinfos := pstore.PeerInfos(dht.peerstore, closest)
197 198 199 200
	for _, pi := range closestinfos {
		if len(pi.Addrs) > 0 {
			withAddresses = append(withAddresses, pi)
			log.Debugf("handleFindPeer: sending back '%s'", pi.ID)
201
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202 203
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
204
	resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), withAddresses)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
205 206 207
	return resp, nil
}

208
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209 210 211 212
	lm := make(lgbl.DeferredMap)
	lm["peer"] = func() interface{} { return p.Pretty() }
	defer log.EventBegin(ctx, "handleGetProviders", lm).Done()

213
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
214
	key := key.Key(pmes.GetKey())
Michael Muré's avatar
Michael Muré committed
215
	lm["key"] = func() interface{} { return key.B58String() }
216 217 218 219 220

	// debug logging niceness.
	reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, key)
	log.Debugf("%s begin", reqDesc)
	defer log.Debugf("%s end", reqDesc)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221 222

	// check if we have this value, to add ourselves as provider.
223
	has, err := dht.datastore.Has(key.DsKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
224
	if err != nil && err != ds.ErrNotFound {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225
		log.Debugf("unexpected datastore error: %v\n", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226 227 228 229
		has = false
	}

	// setup providers
230
	providers := dht.providers.GetProviders(ctx, key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
231 232
	if has {
		providers = append(providers, dht.self)
233
		log.Debugf("%s have the value. added self as provider", reqDesc)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234 235 236
	}

	if providers != nil && len(providers) > 0 {
Jeromy's avatar
Jeromy committed
237
		infos := pstore.PeerInfos(dht.peerstore, providers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
238
		resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
239
		log.Debugf("%s have %d providers: %s", reqDesc, len(providers), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
240 241 242
	}

	// Also send closer peers.
243
	closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
244
	if closer != nil {
Jeromy's avatar
Jeromy committed
245
		infos := pstore.PeerInfos(dht.peerstore, closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
246
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
247
		log.Debugf("%s have %d closer peers: %s", reqDesc, len(closer), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
248 249 250 251 252
	}

	return resp, nil
}

253
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
254 255 256 257
	lm := make(lgbl.DeferredMap)
	lm["peer"] = func() interface{} { return p.Pretty() }

	defer log.EventBegin(ctx, "handleAddProvider", lm).Done()
258
	key := key.Key(pmes.GetKey())
Michael Muré's avatar
Michael Muré committed
259
	lm["key"] = func() interface{} { return key.B58String() }
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
260

261
	log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
262

263
	// add provider should use the address given in the message
264 265 266 267 268
	pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
	for _, pi := range pinfos {
		if pi.ID != p {
			// we should ignore this provider reccord! not from originator.
			// (we chould sign them and check signature later...)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
269
			log.Debugf("handleAddProvider received provider %s from %s. Ignore.", pi.ID, p)
270 271
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
272

273
		if len(pi.Addrs) < 1 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
274
			log.Debugf("%s got no valid addresses for provider %s. Ignore.", dht.self, p)
275 276
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
277

278
		log.Infof("received provider %s for %s (addrs: %s)", p, key, pi.Addrs)
279
		if pi.ID != dht.self { // dont add own addrs.
280
			// add the received addresses to our peerstore.
Jeromy's avatar
Jeromy committed
281
			dht.peerstore.AddAddrs(pi.ID, pi.Addrs, pstore.ProviderAddrTTL)
282
		}
283
		dht.providers.AddProvider(ctx, key, p)
284 285
	}

Jeromy's avatar
Jeromy committed
286
	return nil, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
287
}