routing.go 7.18 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
5
	"encoding/json"
6 7 8
	"errors"
	"math/rand"
	"time"
9

10 11
	proto "code.google.com/p/goprotobuf/proto"

12 13
	ma "github.com/jbenet/go-multiaddr"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	peer "github.com/jbenet/go-ipfs/peer"
15
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
	swarm "github.com/jbenet/go-ipfs/swarm"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
)

20 21 22
// Pool size is the number of nodes used for group find/set RPC calls
var PoolSize = 6

23 24
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
25 26
	//return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32())
	return uint64(rand.Uint32())
27 28
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29 30 31 32 33
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
34
// This is the top level "Store" operation of the DHT
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
36
	var p *peer.Peer
37
	p = s.routes[0].NearestPeer(kb.ConvertKey(key))
38
	if p == nil {
39
		return errors.New("Table returned nil peer!")
40
	}
41

42
	return s.putValueToPeer(p, string(key), value)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43 44 45
}

// GetValue searches for the value corresponding to given Key.
Jeromy's avatar
Jeromy committed
46 47
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
48
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
49
	var p *peer.Peer
50
	p = s.routes[0].NearestPeer(kb.ConvertKey(key))
51
	if p == nil {
52
		return nil, errors.New("Table returned nil peer!")
53
	}
54

55 56 57 58
	pmes := DHTMessage{
		Type: PBDHTMessage_GET_VALUE,
		Key:  string(key),
		Id:   GenerateMessageID(),
59
	}
60
	response_chan := s.ListenFor(pmes.Id, 1, time.Minute)
61

62 63
	mes := swarm.NewMessage(p, pmes.ToProtobuf())
	s.network.Chan.Outgoing <- mes
64

65
	// Wait for either the response or a timeout
66 67
	timeup := time.After(timeout)
	select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68
	case <-timeup:
69
		s.Unlisten(pmes.Id)
70
		return nil, u.ErrTimeout
71 72
	case resp, ok := <-response_chan:
		if !ok {
73 74
			u.PErr("response channel closed before timeout, please investigate.")
			return nil, u.ErrTimeout
75
		}
76
		pmes_out := new(PBDHTMessage)
77 78
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
79
			return nil, err
80
		}
Jeromy's avatar
Jeromy committed
81 82 83 84 85
		if pmes_out.GetSuccess() {
			return pmes_out.GetValue(), nil
		} else {
			return pmes_out.GetValue(), u.ErrSearchIncomplete
		}
86
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87 88 89 90 91 92
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

// Announce that this node can provide value for given key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93
func (s *IpfsDHT) Provide(key u.Key) error {
94
	peers := s.routes[0].NearestPeers(kb.ConvertKey(key), PoolSize)
95 96 97 98
	if len(peers) == 0 {
		//return an error
	}

99 100 101
	pmes := DHTMessage{
		Type: PBDHTMessage_ADD_PROVIDER,
		Key:  string(key),
102 103 104
	}
	pbmes := pmes.ToProtobuf()

105
	for _, p := range peers {
106
		mes := swarm.NewMessage(p, pbmes)
107
		s.network.Chan.Outgoing <- mes
108 109
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
110 111 112
}

// FindProviders searches for peers who can provide the value for given key.
113
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
114
	p := s.routes[0].NearestPeer(kb.ConvertKey(key))
115

116 117 118 119
	pmes := DHTMessage{
		Type: PBDHTMessage_GET_PROVIDERS,
		Key:  string(key),
		Id:   GenerateMessageID(),
120 121 122 123
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

124
	listen_chan := s.ListenFor(pmes.Id, 1, time.Minute)
125
	u.DOut("Find providers for: '%s'", key)
126
	s.network.Chan.Outgoing <- mes
127 128 129 130 131 132
	after := time.After(timeout)
	select {
	case <-after:
		s.Unlisten(pmes.Id)
		return nil, u.ErrTimeout
	case resp := <-listen_chan:
133
		u.DOut("FindProviders: got response.")
134
		pmes_out := new(PBDHTMessage)
135 136 137 138
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil, err
		}
139 140
		var addrs map[u.Key]string
		err = json.Unmarshal(pmes_out.GetValue(), &addrs)
141 142 143 144
		if err != nil {
			return nil, err
		}

145
		var prov_arr []*peer.Peer
146
		for pid, addr := range addrs {
147
			p := s.network.Find(pid)
148
			if p == nil {
149
				maddr, err := ma.NewMultiaddr(addr)
150 151 152 153
				if err != nil {
					u.PErr("error connecting to new peer: %s", err)
					continue
				}
154
				p, err = s.Connect(maddr)
155 156 157 158 159
				if err != nil {
					u.PErr("error connecting to new peer: %s", err)
					continue
				}
			}
160 161
			s.addProviderEntry(key, p)
			prov_arr = append(prov_arr, p)
162 163
		}

164
		return prov_arr, nil
165
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166 167 168 169 170 171
}

// Find specific Peer

// FindPeer searches for a peer with given ID.
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
172
	p := s.routes[0].NearestPeer(kb.ConvertPeerID(id))
173

174 175 176 177
	pmes := DHTMessage{
		Type: PBDHTMessage_FIND_NODE,
		Key:  string(id),
		Id:   GenerateMessageID(),
178 179 180 181
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

182
	listen_chan := s.ListenFor(pmes.Id, 1, time.Minute)
183
	s.network.Chan.Outgoing <- mes
184 185 186 187 188 189
	after := time.After(timeout)
	select {
	case <-after:
		s.Unlisten(pmes.Id)
		return nil, u.ErrTimeout
	case resp := <-listen_chan:
190
		pmes_out := new(PBDHTMessage)
191 192 193 194
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil, err
		}
Jeromy's avatar
Jeromy committed
195 196 197 198 199 200
		addr := string(pmes_out.GetValue())
		maddr, err := ma.NewMultiaddr(addr)
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
201 202 203 204 205 206 207 208 209 210 211 212
		found_peer, err := s.Connect(maddr)
		if err != nil {
			u.POut("Found peer but couldnt connect.")
			return nil, err
		}

		if !found_peer.ID.Equal(id) {
			u.POut("FindPeer: searching for '%s' but found '%s'", id.Pretty(), found_peer.ID.Pretty())
			return found_peer, u.ErrSearchIncomplete
		}

		return found_peer, nil
213
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214
}
215 216 217 218 219 220

// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
	// Thoughts: maybe this should accept an ID and do a peer lookup?
	u.DOut("Enter Ping.")

221
	pmes := DHTMessage{Id: GenerateMessageID(), Type: PBDHTMessage_PING}
222 223 224
	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	before := time.Now()
225
	response_chan := dht.ListenFor(pmes.Id, 1, time.Minute)
226 227 228 229 230 231
	dht.network.Chan.Outgoing <- mes

	tout := time.After(timeout)
	select {
	case <-response_chan:
		roundtrip := time.Since(before)
232
		p.SetLatency(roundtrip)
233 234 235 236 237 238 239 240 241
		u.POut("Ping took %s.", roundtrip.String())
		return nil
	case <-tout:
		// Timed out, think about removing peer from network
		u.DOut("Ping peer timed out.")
		dht.Unlisten(pmes.Id)
		return u.ErrTimeout
	}
}
242 243 244 245

func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
	u.DOut("Begin Diagnostic")
	//Send to N closest peers
246
	targets := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
247 248

	// TODO: Add timeout to this struct so nodes know when to return
249 250 251
	pmes := DHTMessage{
		Type: PBDHTMessage_DIAGNOSTIC,
		Id:   GenerateMessageID(),
252 253
	}

254
	listen_chan := dht.ListenFor(pmes.Id, len(targets), time.Minute*2)
255 256

	pbmes := pmes.ToProtobuf()
257
	for _, p := range targets {
258
		mes := swarm.NewMessage(p, pbmes)
259
		dht.network.Chan.Outgoing <- mes
260 261 262 263 264 265 266 267 268 269
	}

	var out []*diagInfo
	after := time.After(timeout)
	for count := len(targets); count > 0; {
		select {
		case <-after:
			u.DOut("Diagnostic request timed out.")
			return out, u.ErrTimeout
		case resp := <-listen_chan:
270
			pmes_out := new(PBDHTMessage)
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
			err := proto.Unmarshal(resp.Data, pmes_out)
			if err != nil {
				// NOTE: here and elsewhere, need to audit error handling,
				//		some errors should be continued on from
				return out, err
			}

			dec := json.NewDecoder(bytes.NewBuffer(pmes_out.GetValue()))
			for {
				di := new(diagInfo)
				err := dec.Decode(di)
				if err != nil {
					break
				}

				out = append(out, di)
			}
		}
	}

291
	return nil, nil
292
}