handlers.go 6.76 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
package dht

import (
	"errors"
	"fmt"
	"time"

	msg "github.com/jbenet/go-ipfs/net/message"
	peer "github.com/jbenet/go-ipfs/peer"
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
	u "github.com/jbenet/go-ipfs/util"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
)

// dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(*peer.Peer, *Message) (*Message, error)

func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
	switch t {
	case Message_GET_VALUE:
		return dht.handleGetValue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25
	case Message_PUT_VALUE:
		return dht.handlePutValue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26 27
	case Message_FIND_NODE:
		return dht.handleFindPeer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28 29 30 31
	case Message_ADD_PROVIDER:
		return dht.handleAddProvider
	case Message_GET_PROVIDERS:
		return dht.handleGetProviders
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32 33
	case Message_PING:
		return dht.handlePing
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34 35
	case Message_DIAGNOSTIC:
		return dht.handleDiagnostic
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36 37 38 39 40 41 42
	default:
		return nil
	}
}

func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error {
	typ := Message_PUT_VALUE
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43 44
	pmes := newMessage(Message_PUT_VALUE, string(key), 0)
	pmes.Value = value
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45 46 47 48 49 50 51 52 53 54 55 56

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return err
	}
	return dht.sender.SendMessage(context.TODO(), mes)
}

func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error) {
	u.DOut("handleGetValue for key: %s\n", pmes.GetKey())

	// setup response
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57
	resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109

	// first, is the key even a key?
	key := pmes.GetKey()
	if key == "" {
		return nil, errors.New("handleGetValue but no key was provided")
	}

	// let's first check if we have the value locally.
	dskey := ds.NewKey(pmes.GetKey())
	iVal, err := dht.datastore.Get(dskey)

	// if we got an unexpected error, bail.
	if err != ds.ErrNotFound {
		return nil, err
	}

	// if we have the value, respond with it!
	if err == nil {
		u.DOut("handleGetValue success!\n")

		byts, ok := iVal.([]byte)
		if !ok {
			return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
		}

		resp.Value = byts
		return resp, nil
	}

	// if we know any providers for the requested value, return those.
	provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
	if len(provs) > 0 {
		u.DOut("handleGetValue returning %d provider[s]\n", len(provs))
		resp.ProviderPeers = peersToPBPeers(provs)
		return resp, nil
	}

	// Find closest peer on given cluster to desired key and reply with that info
	closer := dht.betterPeerToQuery(pmes)
	if closer == nil {
		u.DOut("handleGetValue could not find a closer node than myself.\n")
		resp.CloserPeers = nil
		return resp, nil
	}

	// we got a closer peer, it seems. return it.
	u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty())
	resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
	return resp, nil
}

// Store a value in this peer local storage
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
110
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
111 112 113 114
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
	dskey := ds.NewKey(pmes.GetKey())
	err := dht.datastore.Put(dskey, pmes.GetValue())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
115
	return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116 117 118 119
}

func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) {
	u.DOut("[%s] Responding to ping from [%s]!\n", dht.self.ID.Pretty(), p.ID.Pretty())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120 121

	return newMessage(pmes.GetType(), "", int(pmes.GetClusterLevel())), nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
122 123 124
}

func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
125
	resp := newMessage(pmes.GetType(), "", pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
	var closest *peer.Peer

	// if looking for self... special case where we send it on CloserPeers.
	if peer.ID(pmes.GetKey()).Equal(dht.self.ID) {
		closest = dht.self
	} else {
		closest = dht.betterPeerToQuery(pmes)
	}

	if closest == nil {
		u.PErr("handleFindPeer: could not find anything.\n")
		return resp, nil
	}

	if len(closest.Addresses) == 0 {
		u.PErr("handleFindPeer: no addresses for connected peer...\n")
		return resp, nil
	}

	u.DOut("handleFindPeer: sending back '%s'\n", closest.ID.Pretty())
	resp.CloserPeers = peersToPBPeers([]*peer.Peer{closest})
	return resp, nil
}

func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151
	resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184

	// check if we have this value, to add ourselves as provider.
	has, err := dht.datastore.Has(ds.NewKey(pmes.GetKey()))
	if err != nil && err != ds.ErrNotFound {
		u.PErr("unexpected datastore error: %v\n", err)
		has = false
	}

	// setup providers
	providers := dht.providers.GetProviders(u.Key(pmes.GetKey()))
	if has {
		providers = append(providers, dht.self)
	}

	// if we've got providers, send thos those.
	if providers != nil && len(providers) > 0 {
		resp.ProviderPeers = peersToPBPeers(providers)
	}

	// Also send closer peers.
	closer := dht.betterPeerToQuery(pmes)
	if closer != nil {
		resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
	}

	return resp, nil
}

type providerInfo struct {
	Creation time.Time
	Value    *peer.Peer
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186
	key := u.Key(pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
188 189
	u.DOut("[%s] Adding [%s] as a provider for '%s'\n",
		dht.self.ID.Pretty(), p.ID.Pretty(), peer.ID(key).Pretty())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
190

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191
	dht.providers.AddProvider(key, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192
	return nil, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
}

// Halt stops all communications from this peer and shut down
// TODO -- remove this in favor of context
func (dht *IpfsDHT) Halt() {
	dht.shutdown <- struct{}{}
	dht.network.Close()
	dht.providers.Halt()
}

// NOTE: not yet finished, low priority
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, error) {
	seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)

	for _, ps := range seq {
		mes, err := msg.FromObject(ps, pmes)
		if err != nil {
			u.PErr("handleDiagnostics error creating message: %v\n", err)
			continue
		}
		// dht.sender.SendRequest(context.TODO(), mes)
	}
	return nil, errors.New("not yet ported back")

	// 	buf := new(bytes.Buffer)
	// 	di := dht.getDiagInfo()
	// 	buf.Write(di.Marshal())
	//
	// 	// NOTE: this shouldnt be a hardcoded value
	// 	after := time.After(time.Second * 20)
	// 	count := len(seq)
	// 	for count > 0 {
	// 		select {
	// 		case <-after:
	// 			//Timeout, return what we have
	// 			goto out
	// 		case reqResp := <-listenChan:
	// 			pmesOut := new(Message)
	// 			err := proto.Unmarshal(reqResp.Data, pmesOut)
	// 			if err != nil {
	// 				// It broke? eh, whatever, keep going
	// 				continue
	// 			}
	// 			buf.Write(reqResp.Data)
	// 			count--
	// 		}
	// 	}
	//
	// out:
	// 	resp := Message{
	// 		Type:     Message_DIAGNOSTIC,
	// 		ID:       pmes.GetId(),
	// 		Value:    buf.Bytes(),
	// 		Response: true,
	// 	}
	//
	// 	mes := swarm.NewMessage(p, resp.ToProtobuf())
	// 	dht.netChan.Outgoing <- mes
}