handlers.go 7.08 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
package dht

import (
	"errors"
	"fmt"
	"time"

	msg "github.com/jbenet/go-ipfs/net/message"
	peer "github.com/jbenet/go-ipfs/peer"
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
	u "github.com/jbenet/go-ipfs/util"

	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
)

16 17
var CloserPeerCount = 4

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19 20 21 22 23 24
// dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(*peer.Peer, *Message) (*Message, error)

func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
	switch t {
	case Message_GET_VALUE:
		return dht.handleGetValue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25 26
	case Message_PUT_VALUE:
		return dht.handlePutValue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27 28
	case Message_FIND_NODE:
		return dht.handleFindPeer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29 30 31 32
	case Message_ADD_PROVIDER:
		return dht.handleAddProvider
	case Message_GET_PROVIDERS:
		return dht.handleGetProviders
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33 34
	case Message_PING:
		return dht.handlePing
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35 36
	case Message_DIAGNOSTIC:
		return dht.handleDiagnostic
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37 38 39 40 41 42
	default:
		return nil
	}
}

func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error) {
43
	log.Debug("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44 45

	// setup response
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
46
	resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47 48 49 50 51 52 53 54

	// first, is the key even a key?
	key := pmes.GetKey()
	if key == "" {
		return nil, errors.New("handleGetValue but no key was provided")
	}

	// let's first check if we have the value locally.
55
	log.Debug("%s handleGetValue looking into ds\n", dht.self)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
56
	dskey := u.Key(pmes.GetKey()).DsKey()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57
	iVal, err := dht.datastore.Get(dskey)
58
	log.Debug("%s handleGetValue looking into ds GOT %v\n", dht.self, iVal)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59 60

	// if we got an unexpected error, bail.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61
	if err != nil && err != ds.ErrNotFound {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62 63 64
		return nil, err
	}

65 66 67 68
	// Note: changed the behavior here to return _as much_ info as possible
	// (potentially all of {value, closer peers, provider})

	// if we have the value, send it back
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69
	if err == nil {
70
		log.Debug("%s handleGetValue success!\n", dht.self)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71 72 73 74 75 76 77 78 79 80 81 82

		byts, ok := iVal.([]byte)
		if !ok {
			return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
		}

		resp.Value = byts
	}

	// if we know any providers for the requested value, return those.
	provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
	if len(provs) > 0 {
83
		log.Debug("handleGetValue returning %d provider[s]\n", len(provs))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
84 85 86 87
		resp.ProviderPeers = peersToPBPeers(provs)
	}

	// Find closest peer on given cluster to desired key and reply with that info
88
	closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
89
	if closer != nil {
90 91 92 93
		for _, p := range closer {
			log.Debug("handleGetValue returning closer peer: '%s'", p)
		}
		resp.CloserPeers = peersToPBPeers(closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
94 95 96 97 98 99
	}

	return resp, nil
}

// Store a value in this peer local storage
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
100
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101 102
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
103
	dskey := u.Key(pmes.GetKey()).DsKey()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104
	err := dht.datastore.Put(dskey, pmes.GetValue())
105
	log.Debug("%s handlePutValue %v %v\n", dht.self, dskey, pmes.GetValue())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
106
	return pmes, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107 108 109
}

func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) {
110
	log.Debug("%s Responding to ping from %s!\n", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
111
	return pmes, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112 113 114
}

func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
115
	resp := newMessage(pmes.GetType(), "", pmes.GetClusterLevel())
116
	var closest []*peer.Peer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117 118 119

	// if looking for self... special case where we send it on CloserPeers.
	if peer.ID(pmes.GetKey()).Equal(dht.self.ID) {
120
		closest = []*peer.Peer{dht.self}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121
	} else {
122
		closest = dht.betterPeersToQuery(pmes, CloserPeerCount)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
123 124 125
	}

	if closest == nil {
126
		log.Error("handleFindPeer: could not find anything.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127 128 129
		return resp, nil
	}

130 131 132 133 134
	var withAddresses []*peer.Peer
	for _, p := range closest {
		if len(p.Addresses) > 0 {
			withAddresses = append(withAddresses, p)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135 136
	}

137 138 139 140
	for _, p := range withAddresses {
		log.Debug("handleFindPeer: sending back '%s'", p)
	}
	resp.CloserPeers = peersToPBPeers(withAddresses)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141 142 143 144
	return resp, nil
}

func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
145
	resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146 147

	// check if we have this value, to add ourselves as provider.
148
	log.Debug("handling GetProviders: '%s'", pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
149 150
	dsk := u.Key(pmes.GetKey()).DsKey()
	has, err := dht.datastore.Has(dsk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151
	if err != nil && err != ds.ErrNotFound {
152
		log.Error("unexpected datastore error: %v\n", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
		has = false
	}

	// setup providers
	providers := dht.providers.GetProviders(u.Key(pmes.GetKey()))
	if has {
		providers = append(providers, dht.self)
	}

	// if we've got providers, send thos those.
	if providers != nil && len(providers) > 0 {
		resp.ProviderPeers = peersToPBPeers(providers)
	}

	// Also send closer peers.
168
	closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169
	if closer != nil {
170
		resp.CloserPeers = peersToPBPeers(closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
171 172 173 174 175 176 177 178 179 180
	}

	return resp, nil
}

type providerInfo struct {
	Creation time.Time
	Value    *peer.Peer
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
182
	key := u.Key(pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
183

184
	log.Debug("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185

186
	// add provider should use the address given in the message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187 188 189
	for _, pb := range pmes.GetProviderPeers() {
		pid := peer.ID(pb.GetId())
		if pid.Equal(p.ID) {
190 191 192 193 194 195

			addr, err := pb.Address()
			if err != nil {
				log.Error("provider %s error with address %s", p, *pb.Addr)
				continue
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
196 197

			log.Info("received provider %s %s for %s", p, addr, key)
198
			p.AddAddress(addr)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
199 200 201 202
			dht.providers.AddProvider(key, p)

		} else {
			log.Error("handleAddProvider received provider %s from %s", pid, p)
203 204 205
		}
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
206
	return pmes, nil // send back same msg as confirmation.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207 208 209 210 211 212 213 214 215 216 217 218 219
}

// Halt stops all communications from this peer and shut down
// TODO -- remove this in favor of context
func (dht *IpfsDHT) Halt() {
	dht.providers.Halt()
}

// NOTE: not yet finished, low priority
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, error) {
	seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)

	for _, ps := range seq {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220
		_, err := msg.FromObject(ps, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221
		if err != nil {
222
			log.Error("handleDiagnostics error creating message: %v\n", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
			continue
		}
		// dht.sender.SendRequest(context.TODO(), mes)
	}
	return nil, errors.New("not yet ported back")

	// 	buf := new(bytes.Buffer)
	// 	di := dht.getDiagInfo()
	// 	buf.Write(di.Marshal())
	//
	// 	// NOTE: this shouldnt be a hardcoded value
	// 	after := time.After(time.Second * 20)
	// 	count := len(seq)
	// 	for count > 0 {
	// 		select {
	// 		case <-after:
	// 			//Timeout, return what we have
	// 			goto out
	// 		case reqResp := <-listenChan:
	// 			pmesOut := new(Message)
	// 			err := proto.Unmarshal(reqResp.Data, pmesOut)
	// 			if err != nil {
	// 				// It broke? eh, whatever, keep going
	// 				continue
	// 			}
	// 			buf.Write(reqResp.Data)
	// 			count--
	// 		}
	// 	}
	//
	// out:
	// 	resp := Message{
	// 		Type:     Message_DIAGNOSTIC,
	// 		ID:       pmes.GetId(),
	// 		Value:    buf.Bytes(),
	// 		Response: true,
	// 	}
	//
	// 	mes := swarm.NewMessage(p, resp.ToProtobuf())
	// 	dht.netChan.Outgoing <- mes
}