handlers.go 8.66 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4 5
package dht

import (
	"errors"
	"fmt"
6
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7

Jeromy's avatar
Jeromy committed
8
	ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore"
9
	key "github.com/ipfs/go-ipfs/blocks/key"
10
	pb "github.com/ipfs/go-ipfs/routing/dht/pb"
11 12 13
	lgbl "github.com/ipfs/go-ipfs/thirdparty/loggables"
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
	u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
14
	peer "gx/ipfs/QmZwZjMVGss5rqYsJVGy18gNbkTJffFyq2x1uJ4e4p3ZAt/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
15
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

Jeromy's avatar
Jeromy committed
18
// The number of closer peers to send on requests.
Jeromy's avatar
Jeromy committed
19
var CloserPeerCount = KValue
20

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21
// dhthandler specifies the signature of functions that handle DHT messages.
22
type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23

24
func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25
	switch t {
26
	case pb.Message_GET_VALUE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27
		return dht.handleGetValue
28
	case pb.Message_PUT_VALUE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29
		return dht.handlePutValue
30
	case pb.Message_FIND_NODE:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31
		return dht.handleFindPeer
32
	case pb.Message_ADD_PROVIDER:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33
		return dht.handleAddProvider
34
	case pb.Message_GET_PROVIDERS:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35
		return dht.handleGetProviders
36
	case pb.Message_PING:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37 38 39 40 41 42
		return dht.handlePing
	default:
		return nil
	}
}

43
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
44
	defer log.EventBegin(ctx, "handleGetValue", p).Done()
Jeromy's avatar
Jeromy committed
45
	log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
46 47

	// setup response
48
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49

50
	// first, is there even a key?
51
	k := key.Key(pmes.GetKey())
52
	if k == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
53
		return nil, errors.New("handleGetValue but no key was provided")
54
		// TODO: send back an error response? could be bad, but the other node's hanging.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55 56
	}

57 58
	rec, err := dht.checkLocalDatastore(k)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59 60
		return nil, err
	}
61
	resp.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62 63

	// Find closest peer on given cluster to desired key and reply with that info
64
	closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Jeromy's avatar
Jeromy committed
65
	if len(closer) > 0 {
66
		closerinfos := peer.PeerInfos(dht.peerstore, closer)
67 68 69
		for _, pi := range closerinfos {
			log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
			if len(pi.Addrs) < 1 {
rht's avatar
rht committed
70
				log.Errorf(`no addresses on peer being sent!
71 72 73
					[local:%s]
					[sending:%s]
					[remote:%s]`, dht.self, pi.ID, p)
Jeromy's avatar
Jeromy committed
74
			}
75
		}
76

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78 79 80 81 82
	}

	return resp, nil
}

83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
func (dht *IpfsDHT) checkLocalDatastore(k key.Key) (*pb.Record, error) {
	log.Debugf("%s handleGetValue looking into ds", dht.self)
	dskey := k.DsKey()
	iVal, err := dht.datastore.Get(dskey)
	log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)

	if err == ds.ErrNotFound {
		return nil, nil
	}

	// if we got an unexpected error, bail.
	if err != nil {
		return nil, err
	}

	// if we have the value, send it back
	log.Debugf("%s handleGetValue success!", dht.self)

	byts, ok := iVal.([]byte)
	if !ok {
		return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
	}

	rec := new(pb.Record)
	err = proto.Unmarshal(byts, rec)
	if err != nil {
Richard Littauer's avatar
Richard Littauer committed
109
		log.Debug("Failed to unmarshal DHT record from datastore.")
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
		return nil, err
	}

	// if its our record, dont bother checking the times on it
	if peer.ID(rec.GetAuthor()) == dht.self {
		return rec, nil
	}

	var recordIsBad bool
	recvtime, err := u.ParseRFC3339(rec.GetTimeReceived())
	if err != nil {
		log.Info("either no receive time set on record, or it was invalid: ", err)
		recordIsBad = true
	}

	if time.Now().Sub(recvtime) > MaxRecordAge {
		log.Debug("old record found, tossing.")
		recordIsBad = true
	}

130
	// NOTE: We do not verify the record here beyond checking these timestamps.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
	// we put the burden of checking the records on the requester as checking a record
	// may be computationally expensive

	if recordIsBad {
		err := dht.datastore.Delete(dskey)
		if err != nil {
			log.Error("Failed to delete bad record from datastore: ", err)
		}

		return nil, nil // can treat this as not having the record at all
	}

	return rec, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146
// Store a value in this peer local storage
147
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
148
	defer log.EventBegin(ctx, "handlePutValue", p).Done()
149
	dskey := key.Key(pmes.GetKey()).DsKey()
150

151
	if err := dht.verifyRecordLocally(pmes.GetRecord()); err != nil {
152
		log.Warningf("Bad dht record in PUT from: %s. %s", key.Key(pmes.GetRecord().GetAuthor()), err)
153 154 155
		return nil, err
	}

156 157 158 159 160 161
	rec := pmes.GetRecord()

	// record the time we receive every record
	rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))

	data, err := proto.Marshal(rec)
162 163 164 165 166
	if err != nil {
		return nil, err
	}

	err = dht.datastore.Put(dskey, data)
Jeromy's avatar
Jeromy committed
167
	log.Debugf("%s handlePutValue %v", dht.self, dskey)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
168
	return pmes, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169 170
}

171
func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
172
	log.Debugf("%s Responding to ping from %s!\n", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
173
	return pmes, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174 175
}

176
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
177
	defer log.EventBegin(ctx, "handleFindPeer", p).Done()
178
	resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
179
	var closest []peer.ID
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
180 181

	// if looking for self... special case where we send it on CloserPeers.
182 183
	if peer.ID(pmes.GetKey()) == dht.self {
		closest = []peer.ID{dht.self}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
184
	} else {
185
		closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186 187 188
	}

	if closest == nil {
189
		log.Infof("%s handleFindPeer %s: could not find anything.", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
190 191 192
		return resp, nil
	}

193 194 195 196 197 198
	var withAddresses []peer.PeerInfo
	closestinfos := peer.PeerInfos(dht.peerstore, closest)
	for _, pi := range closestinfos {
		if len(pi.Addrs) > 0 {
			withAddresses = append(withAddresses, pi)
			log.Debugf("handleFindPeer: sending back '%s'", pi.ID)
199
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
200 201
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202
	resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), withAddresses)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
203 204 205
	return resp, nil
}

206
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207 208 209 210
	lm := make(lgbl.DeferredMap)
	lm["peer"] = func() interface{} { return p.Pretty() }
	defer log.EventBegin(ctx, "handleGetProviders", lm).Done()

211
	resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
212
	key := key.Key(pmes.GetKey())
Michael Muré's avatar
Michael Muré committed
213
	lm["key"] = func() interface{} { return key.B58String() }
214 215 216 217 218

	// debug logging niceness.
	reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, key)
	log.Debugf("%s begin", reqDesc)
	defer log.Debugf("%s end", reqDesc)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
219 220

	// check if we have this value, to add ourselves as provider.
221
	has, err := dht.datastore.Has(key.DsKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
222
	if err != nil && err != ds.ErrNotFound {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
223
		log.Debugf("unexpected datastore error: %v\n", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
224 225 226 227
		has = false
	}

	// setup providers
228
	providers := dht.providers.GetProviders(ctx, key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
229 230
	if has {
		providers = append(providers, dht.self)
231
		log.Debugf("%s have the value. added self as provider", reqDesc)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
232 233 234
	}

	if providers != nil && len(providers) > 0 {
235
		infos := peer.PeerInfos(dht.peerstore, providers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
236
		resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
237
		log.Debugf("%s have %d providers: %s", reqDesc, len(providers), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
238 239 240
	}

	// Also send closer peers.
241
	closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
242
	if closer != nil {
Jeromy's avatar
Jeromy committed
243
		infos := peer.PeerInfos(dht.peerstore, closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
244
		resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
245
		log.Debugf("%s have %d closer peers: %s", reqDesc, len(closer), infos)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
246 247 248 249 250
	}

	return resp, nil
}

251
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252 253 254 255
	lm := make(lgbl.DeferredMap)
	lm["peer"] = func() interface{} { return p.Pretty() }

	defer log.EventBegin(ctx, "handleAddProvider", lm).Done()
256
	key := key.Key(pmes.GetKey())
Michael Muré's avatar
Michael Muré committed
257
	lm["key"] = func() interface{} { return key.B58String() }
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
258

259
	log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
260

261
	// add provider should use the address given in the message
262 263 264 265 266
	pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
	for _, pi := range pinfos {
		if pi.ID != p {
			// we should ignore this provider reccord! not from originator.
			// (we chould sign them and check signature later...)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
267
			log.Debugf("handleAddProvider received provider %s from %s. Ignore.", pi.ID, p)
268 269
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
270

271
		if len(pi.Addrs) < 1 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
272
			log.Debugf("%s got no valid addresses for provider %s. Ignore.", dht.self, p)
273 274
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
275

276
		log.Infof("received provider %s for %s (addrs: %s)", p, key, pi.Addrs)
277
		if pi.ID != dht.self { // dont add own addrs.
278
			// add the received addresses to our peerstore.
279
			dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peer.ProviderAddrTTL)
280
		}
281
		dht.providers.AddProvider(ctx, key, p)
282 283
	}

Jeromy's avatar
Jeromy committed
284
	return nil, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
285
}