handlers.go 8.64 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4 5
package dht

import (
	"errors"
	"fmt"
6
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7

8
	proto "github.com/gogo/protobuf/proto"
George Antoniadis's avatar
George Antoniadis committed
9
	ds "github.com/ipfs/go-datastore"
10
	u "github.com/ipfs/go-ipfs-util"
George Antoniadis's avatar
George Antoniadis committed
11 12
	key "github.com/ipfs/go-key"
	lgbl "github.com/ipfs/go-libp2p-loggables"
13 14
	peer "github.com/ipfs/go-libp2p-peer"
	pstore "github.com/ipfs/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
15
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
George Antoniadis's avatar
George Antoniadis committed
16
	recpb "github.com/libp2p/go-libp2p-record/pb"
17
	context "golang.org/x/net/context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
)

Jeromy's avatar
Jeromy committed
20
// The number of closer peers to send on requests.
Jeromy's avatar
Jeromy committed
21
var CloserPeerCount = KValue
22

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23
// dhthandler specifies the signature of functions that handle DHT messages.
24
type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25

26
func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27
	switch t {
28
	case pb.Message_GET_VALUE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29
		return dht.handleGetValue
30
	case pb.Message_PUT_VALUE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31
		return dht.handlePutValue
32
	case pb.Message_FIND_NODE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33
		return dht.handleFindPeer
34
	case pb.Message_ADD_PROVIDER:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35
		return dht.handleAddProvider
36
	case pb.Message_GET_PROVIDERS:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37
		return dht.handleGetProviders
38
	case pb.Message_PING:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39 40 41 42 43 44
		return dht.handlePing
	default:
		return nil
	}
}

45
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
46
	defer log.EventBegin(ctx, "handleGetValue", p).Done()
Jeromy's avatar
Jeromy committed
47
	log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
48 49

	// setup response
50
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
51

52
	// first, is there even a key?
53
	k := key.Key(pmes.GetKey())
54
	if k == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55
		return nil, errors.New("handleGetValue but no key was provided")
56
		// TODO: send back an error response? could be bad, but the other node's hanging.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57 58
	}

59 60
	rec, err := dht.checkLocalDatastore(k)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61 62
		return nil, err
	}
63
	resp.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64 65

	// Find closest peer on given cluster to desired key and reply with that info
66
	closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Jeromy's avatar
Jeromy committed
67
	if len(closer) > 0 {
Jeromy's avatar
Jeromy committed
68
		closerinfos := pstore.PeerInfos(dht.peerstore, closer)
69 70 71
		for _, pi := range closerinfos {
			log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
			if len(pi.Addrs) < 1 {
rht's avatar
rht committed
72
				log.Errorf(`no addresses on peer being sent!
73 74 75
					[local:%s]
					[sending:%s]
					[remote:%s]`, dht.self, pi.ID, p)
Jeromy's avatar
Jeromy committed
76
			}
77
		}
78

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80 81 82 83 84
	}

	return resp, nil
}

George Antoniadis's avatar
George Antoniadis committed
85
func (dht *IpfsDHT) checkLocalDatastore(k key.Key) (*recpb.Record, error) {
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
	log.Debugf("%s handleGetValue looking into ds", dht.self)
	dskey := k.DsKey()
	iVal, err := dht.datastore.Get(dskey)
	log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)

	if err == ds.ErrNotFound {
		return nil, nil
	}

	// if we got an unexpected error, bail.
	if err != nil {
		return nil, err
	}

	// if we have the value, send it back
	log.Debugf("%s handleGetValue success!", dht.self)

	byts, ok := iVal.([]byte)
	if !ok {
		return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
	}

George Antoniadis's avatar
George Antoniadis committed
108
	rec := new(recpb.Record)
109 110
	err = proto.Unmarshal(byts, rec)
	if err != nil {
111
		log.Debug("failed to unmarshal DHT record from datastore")
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
		return nil, err
	}

	// if its our record, dont bother checking the times on it
	if peer.ID(rec.GetAuthor()) == dht.self {
		return rec, nil
	}

	var recordIsBad bool
	recvtime, err := u.ParseRFC3339(rec.GetTimeReceived())
	if err != nil {
		log.Info("either no receive time set on record, or it was invalid: ", err)
		recordIsBad = true
	}

	if time.Now().Sub(recvtime) > MaxRecordAge {
		log.Debug("old record found, tossing.")
		recordIsBad = true
	}

132
	// NOTE: We do not verify the record here beyond checking these timestamps.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
	// we put the burden of checking the records on the requester as checking a record
	// may be computationally expensive

	if recordIsBad {
		err := dht.datastore.Delete(dskey)
		if err != nil {
			log.Error("Failed to delete bad record from datastore: ", err)
		}

		return nil, nil // can treat this as not having the record at all
	}

	return rec, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148
// Store a value in this peer local storage
149
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
150
	defer log.EventBegin(ctx, "handlePutValue", p).Done()
151
	dskey := key.Key(pmes.GetKey()).DsKey()
152

153 154 155 156 157 158 159
	rec := pmes.GetRecord()
	if rec == nil {
		log.Infof("Got nil record from: %s", p.Pretty())
		return nil, errors.New("nil record")
	}

	if err := dht.verifyRecordLocally(rec); err != nil {
160
		log.Warningf("Bad dht record in PUT from: %s. %s", key.Key(pmes.GetRecord().GetAuthor()), err)
161 162 163
		return nil, err
	}

164 165 166 167
	// record the time we receive every record
	rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))

	data, err := proto.Marshal(rec)
168 169 170 171 172
	if err != nil {
		return nil, err
	}

	err = dht.datastore.Put(dskey, data)
Jeromy's avatar
Jeromy committed
173
	log.Debugf("%s handlePutValue %v", dht.self, dskey)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174
	return pmes, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
175 176
}

177
func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178
	log.Debugf("%s Responding to ping from %s!\n", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
179
	return pmes, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
180 181
}

182
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
183
	defer log.EventBegin(ctx, "handleFindPeer", p).Done()
184
	resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
185
	var closest []peer.ID
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186 187

	// if looking for self... special case where we send it on CloserPeers.
188 189
	if peer.ID(pmes.GetKey()) == dht.self {
		closest = []peer.ID{dht.self}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
190
	} else {
191
		closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192 193 194
	}

	if closest == nil {
195
		log.Infof("%s handleFindPeer %s: could not find anything.", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
196 197 198
		return resp, nil
	}

Jeromy's avatar
Jeromy committed
199 200
	var withAddresses []pstore.PeerInfo
	closestinfos := pstore.PeerInfos(dht.peerstore, closest)
201 202 203 204
	for _, pi := range closestinfos {
		if len(pi.Addrs) > 0 {
			withAddresses = append(withAddresses, pi)
			log.Debugf("handleFindPeer: sending back '%s'", pi.ID)
205
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
206 207
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208
	resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), withAddresses)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209 210 211
	return resp, nil
}

212
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213 214 215 216
	lm := make(lgbl.DeferredMap)
	lm["peer"] = func() interface{} { return p.Pretty() }
	defer log.EventBegin(ctx, "handleGetProviders", lm).Done()

217
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
218
	key := key.Key(pmes.GetKey())
Michael Muré's avatar
Michael Muré committed
219
	lm["key"] = func() interface{} { return key.B58String() }
220 221 222 223 224

	// debug logging niceness.
	reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, key)
	log.Debugf("%s begin", reqDesc)
	defer log.Debugf("%s end", reqDesc)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225 226

	// check if we have this value, to add ourselves as provider.
227
	has, err := dht.datastore.Has(key.DsKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
228
	if err != nil && err != ds.ErrNotFound {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
229
		log.Debugf("unexpected datastore error: %v\n", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
230 231 232 233
		has = false
	}

	// setup providers
234
	providers := dht.providers.GetProviders(ctx, key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
235 236
	if has {
		providers = append(providers, dht.self)
237
		log.Debugf("%s have the value. added self as provider", reqDesc)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
238 239 240
	}

	if providers != nil && len(providers) > 0 {
Jeromy's avatar
Jeromy committed
241
		infos := pstore.PeerInfos(dht.peerstore, providers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
242
		resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
243
		log.Debugf("%s have %d providers: %s", reqDesc, len(providers), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
244 245 246
	}

	// Also send closer peers.
247
	closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
248
	if closer != nil {
Jeromy's avatar
Jeromy committed
249
		infos := pstore.PeerInfos(dht.peerstore, closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
250
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
251
		log.Debugf("%s have %d closer peers: %s", reqDesc, len(closer), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252 253 254 255 256
	}

	return resp, nil
}

257
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
258 259 260 261
	lm := make(lgbl.DeferredMap)
	lm["peer"] = func() interface{} { return p.Pretty() }

	defer log.EventBegin(ctx, "handleAddProvider", lm).Done()
262
	key := key.Key(pmes.GetKey())
Michael Muré's avatar
Michael Muré committed
263
	lm["key"] = func() interface{} { return key.B58String() }
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
264

265
	log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
266

267
	// add provider should use the address given in the message
268 269 270 271 272
	pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
	for _, pi := range pinfos {
		if pi.ID != p {
			// we should ignore this provider reccord! not from originator.
			// (we chould sign them and check signature later...)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
273
			log.Debugf("handleAddProvider received provider %s from %s. Ignore.", pi.ID, p)
274 275
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
276

277
		if len(pi.Addrs) < 1 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
278
			log.Debugf("%s got no valid addresses for provider %s. Ignore.", dht.self, p)
279 280
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
281

282
		log.Infof("received provider %s for %s (addrs: %s)", p, key, pi.Addrs)
283
		if pi.ID != dht.self { // dont add own addrs.
284
			// add the received addresses to our peerstore.
Jeromy's avatar
Jeromy committed
285
			dht.peerstore.AddAddrs(pi.ID, pi.Addrs, pstore.ProviderAddrTTL)
286
		}
287
		dht.providers.AddProvider(ctx, key, p)
288 289
	}

Jeromy's avatar
Jeromy committed
290
	return nil, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
291
}