handlers.go 6.95 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
package dht

import (
	"errors"
	"fmt"
	"time"

	msg "github.com/jbenet/go-ipfs/net/message"
	peer "github.com/jbenet/go-ipfs/peer"
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
	u "github.com/jbenet/go-ipfs/util"

	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
)

// dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(*peer.Peer, *Message) (*Message, error)

func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
	switch t {
	case Message_GET_VALUE:
		return dht.handleGetValue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24
	case Message_PUT_VALUE:
		return dht.handlePutValue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25 26
	case Message_FIND_NODE:
		return dht.handleFindPeer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27 28 29 30
	case Message_ADD_PROVIDER:
		return dht.handleAddProvider
	case Message_GET_PROVIDERS:
		return dht.handleGetProviders
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31 32
	case Message_PING:
		return dht.handlePing
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33 34
	case Message_DIAGNOSTIC:
		return dht.handleDiagnostic
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35 36 37 38 39 40
	default:
		return nil
	}
}

func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error) {
41
	log.Debug("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42 43

	// setup response
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44
	resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45 46 47 48 49 50 51 52

	// first, is the key even a key?
	key := pmes.GetKey()
	if key == "" {
		return nil, errors.New("handleGetValue but no key was provided")
	}

	// let's first check if we have the value locally.
53
	log.Debug("%s handleGetValue looking into ds\n", dht.self)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
54
	dskey := u.Key(pmes.GetKey()).DsKey()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55
	iVal, err := dht.datastore.Get(dskey)
56
	log.Debug("%s handleGetValue looking into ds GOT %v\n", dht.self, iVal)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57 58

	// if we got an unexpected error, bail.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59
	if err != nil && err != ds.ErrNotFound {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60 61 62
		return nil, err
	}

63 64 65 66
	// Note: changed the behavior here to return _as much_ info as possible
	// (potentially all of {value, closer peers, provider})

	// if we have the value, send it back
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67
	if err == nil {
68
		log.Debug("%s handleGetValue success!\n", dht.self)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69 70 71 72 73 74 75 76 77 78 79 80

		byts, ok := iVal.([]byte)
		if !ok {
			return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
		}

		resp.Value = byts
	}

	// if we know any providers for the requested value, return those.
	provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
	if len(provs) > 0 {
81
		log.Debug("handleGetValue returning %d provider[s]\n", len(provs))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82 83 84 85 86
		resp.ProviderPeers = peersToPBPeers(provs)
	}

	// Find closest peer on given cluster to desired key and reply with that info
	closer := dht.betterPeerToQuery(pmes)
87
	if closer != nil {
88
		log.Debug("handleGetValue returning a closer peer: '%s'\n", closer)
89
		resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
90 91 92 93 94 95
	}

	return resp, nil
}

// Store a value in this peer local storage
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
96
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
97 98
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99
	dskey := u.Key(pmes.GetKey()).DsKey()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
100
	err := dht.datastore.Put(dskey, pmes.GetValue())
101
	log.Debug("%s handlePutValue %v %v\n", dht.self, dskey, pmes.GetValue())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102
	return pmes, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
103 104 105
}

func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) {
106
	log.Debug("%s Responding to ping from %s!\n", dht.self, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107
	return pmes, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
108 109 110
}

func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
111
	resp := newMessage(pmes.GetType(), "", pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112 113 114 115 116 117 118 119 120 121
	var closest *peer.Peer

	// if looking for self... special case where we send it on CloserPeers.
	if peer.ID(pmes.GetKey()).Equal(dht.self.ID) {
		closest = dht.self
	} else {
		closest = dht.betterPeerToQuery(pmes)
	}

	if closest == nil {
122
		log.Error("handleFindPeer: could not find anything.\n")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
123 124 125 126
		return resp, nil
	}

	if len(closest.Addresses) == 0 {
127
		log.Error("handleFindPeer: no addresses for connected peer...\n")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
128 129 130
		return resp, nil
	}

131
	log.Debug("handleFindPeer: sending back '%s'\n", closest)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132 133 134 135 136
	resp.CloserPeers = peersToPBPeers([]*peer.Peer{closest})
	return resp, nil
}

func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
137
	resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
138 139

	// check if we have this value, to add ourselves as provider.
140
	log.Debug("handling GetProviders: '%s'", pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141 142
	dsk := u.Key(pmes.GetKey()).DsKey()
	has, err := dht.datastore.Has(dsk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
143
	if err != nil && err != ds.ErrNotFound {
144
		log.Error("unexpected datastore error: %v\n", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
		has = false
	}

	// setup providers
	providers := dht.providers.GetProviders(u.Key(pmes.GetKey()))
	if has {
		providers = append(providers, dht.self)
	}

	// if we've got providers, send thos those.
	if providers != nil && len(providers) > 0 {
		resp.ProviderPeers = peersToPBPeers(providers)
	}

	// Also send closer peers.
	closer := dht.betterPeerToQuery(pmes)
	if closer != nil {
		resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
	}

	return resp, nil
}

type providerInfo struct {
	Creation time.Time
	Value    *peer.Peer
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
173
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174
	key := u.Key(pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
175

176
	log.Debug("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
177

178
	// add provider should use the address given in the message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
179 180 181
	for _, pb := range pmes.GetProviderPeers() {
		pid := peer.ID(pb.GetId())
		if pid.Equal(p.ID) {
182 183 184 185 186 187

			addr, err := pb.Address()
			if err != nil {
				log.Error("provider %s error with address %s", p, *pb.Addr)
				continue
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
188 189

			log.Info("received provider %s %s for %s", p, addr, key)
190
			p.AddAddress(addr)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191 192 193 194
			dht.providers.AddProvider(key, p)

		} else {
			log.Error("handleAddProvider received provider %s from %s", pid, p)
195 196 197
		}
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198
	return pmes, nil // send back same msg as confirmation.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
199 200 201 202 203 204 205 206 207 208 209 210 211
}

// Halt stops all communications from this peer and shut down
// TODO -- remove this in favor of context
func (dht *IpfsDHT) Halt() {
	dht.providers.Halt()
}

// NOTE: not yet finished, low priority
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, error) {
	seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)

	for _, ps := range seq {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212
		_, err := msg.FromObject(ps, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213
		if err != nil {
214
			log.Error("handleDiagnostics error creating message: %v\n", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
			continue
		}
		// dht.sender.SendRequest(context.TODO(), mes)
	}
	return nil, errors.New("not yet ported back")

	// 	buf := new(bytes.Buffer)
	// 	di := dht.getDiagInfo()
	// 	buf.Write(di.Marshal())
	//
	// 	// NOTE: this shouldnt be a hardcoded value
	// 	after := time.After(time.Second * 20)
	// 	count := len(seq)
	// 	for count > 0 {
	// 		select {
	// 		case <-after:
	// 			//Timeout, return what we have
	// 			goto out
	// 		case reqResp := <-listenChan:
	// 			pmesOut := new(Message)
	// 			err := proto.Unmarshal(reqResp.Data, pmesOut)
	// 			if err != nil {
	// 				// It broke? eh, whatever, keep going
	// 				continue
	// 			}
	// 			buf.Write(reqResp.Data)
	// 			count--
	// 		}
	// 	}
	//
	// out:
	// 	resp := Message{
	// 		Type:     Message_DIAGNOSTIC,
	// 		ID:       pmes.GetId(),
	// 		Value:    buf.Bytes(),
	// 		Response: true,
	// 	}
	//
	// 	mes := swarm.NewMessage(p, resp.ToProtobuf())
	// 	dht.netChan.Outgoing <- mes
}