routing.go 7.11 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
5
	"encoding/json"
6 7 8
	"errors"
	"math/rand"
	"time"
9

10 11
	proto "code.google.com/p/goprotobuf/proto"

12 13
	ma "github.com/jbenet/go-multiaddr"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14 15 16
	peer "github.com/jbenet/go-ipfs/peer"
	swarm "github.com/jbenet/go-ipfs/swarm"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18
)

19 20 21
// Pool size is the number of nodes used for group find/set RPC calls
var PoolSize = 6

22 23
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
24 25
	//return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32())
	return uint64(rand.Uint32())
26 27
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28 29 30 31 32
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
33
// This is the top level "Store" operation of the DHT
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
35
	var p *peer.Peer
36
	p = s.routes[0].NearestPeer(convertKey(key))
37
	if p == nil {
38
		return errors.New("Table returned nil peer!")
39
	}
40

41
	return s.putValueToPeer(p, string(key), value)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42 43 44
}

// GetValue searches for the value corresponding to given Key.
Jeromy's avatar
Jeromy committed
45 46
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
48
	var p *peer.Peer
49
	p = s.routes[0].NearestPeer(convertKey(key))
50
	if p == nil {
51
		return nil, errors.New("Table returned nil peer!")
52
	}
53

54 55 56 57
	pmes := DHTMessage{
		Type: PBDHTMessage_GET_VALUE,
		Key:  string(key),
		Id:   GenerateMessageID(),
58
	}
59
	response_chan := s.ListenFor(pmes.Id, 1, time.Minute)
60

61 62
	mes := swarm.NewMessage(p, pmes.ToProtobuf())
	s.network.Chan.Outgoing <- mes
63

64
	// Wait for either the response or a timeout
65 66
	timeup := time.After(timeout)
	select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67
	case <-timeup:
68
		s.Unlisten(pmes.Id)
69
		return nil, u.ErrTimeout
70 71
	case resp, ok := <-response_chan:
		if !ok {
72 73
			u.PErr("response channel closed before timeout, please investigate.")
			return nil, u.ErrTimeout
74
		}
75
		pmes_out := new(PBDHTMessage)
76 77
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
78
			return nil, err
79
		}
Jeromy's avatar
Jeromy committed
80 81 82 83 84
		if pmes_out.GetSuccess() {
			return pmes_out.GetValue(), nil
		} else {
			return pmes_out.GetValue(), u.ErrSearchIncomplete
		}
85
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86 87 88 89 90 91
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

// Announce that this node can provide value for given key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
92
func (s *IpfsDHT) Provide(key u.Key) error {
93
	peers := s.routes[0].NearestPeers(convertKey(key), PoolSize)
94 95 96 97
	if len(peers) == 0 {
		//return an error
	}

98 99 100
	pmes := DHTMessage{
		Type: PBDHTMessage_ADD_PROVIDER,
		Key:  string(key),
101 102 103
	}
	pbmes := pmes.ToProtobuf()

104
	for _, p := range peers {
105
		mes := swarm.NewMessage(p, pbmes)
106
		s.network.Chan.Outgoing <- mes
107 108
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
109 110 111
}

// FindProviders searches for peers who can provide the value for given key.
112
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
113
	p := s.routes[0].NearestPeer(convertKey(key))
114

115 116 117 118
	pmes := DHTMessage{
		Type: PBDHTMessage_GET_PROVIDERS,
		Key:  string(key),
		Id:   GenerateMessageID(),
119 120 121 122
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

123
	listen_chan := s.ListenFor(pmes.Id, 1, time.Minute)
124
	u.DOut("Find providers for: '%s'", key)
125
	s.network.Chan.Outgoing <- mes
126 127 128 129 130 131
	after := time.After(timeout)
	select {
	case <-after:
		s.Unlisten(pmes.Id)
		return nil, u.ErrTimeout
	case resp := <-listen_chan:
132
		u.DOut("FindProviders: got response.")
133
		pmes_out := new(PBDHTMessage)
134 135 136 137
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil, err
		}
138 139
		var addrs map[u.Key]string
		err = json.Unmarshal(pmes_out.GetValue(), &addrs)
140 141 142 143
		if err != nil {
			return nil, err
		}

144
		var prov_arr []*peer.Peer
145
		for pid, addr := range addrs {
146
			p := s.network.Find(pid)
147
			if p == nil {
148
				maddr, err := ma.NewMultiaddr(addr)
149 150 151 152
				if err != nil {
					u.PErr("error connecting to new peer: %s", err)
					continue
				}
153
				p, err = s.Connect(maddr)
154 155 156 157 158
				if err != nil {
					u.PErr("error connecting to new peer: %s", err)
					continue
				}
			}
159 160
			s.addProviderEntry(key, p)
			prov_arr = append(prov_arr, p)
161 162
		}

163
		return prov_arr, nil
164
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165 166 167 168 169 170
}

// Find specific Peer

// FindPeer searches for a peer with given ID.
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
171
	p := s.routes[0].NearestPeer(convertPeerID(id))
172

173 174 175 176
	pmes := DHTMessage{
		Type: PBDHTMessage_FIND_NODE,
		Key:  string(id),
		Id:   GenerateMessageID(),
177 178 179 180
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

181
	listen_chan := s.ListenFor(pmes.Id, 1, time.Minute)
182
	s.network.Chan.Outgoing <- mes
183 184 185 186 187 188
	after := time.After(timeout)
	select {
	case <-after:
		s.Unlisten(pmes.Id)
		return nil, u.ErrTimeout
	case resp := <-listen_chan:
189
		pmes_out := new(PBDHTMessage)
190 191 192 193
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil, err
		}
Jeromy's avatar
Jeromy committed
194 195 196 197 198 199
		addr := string(pmes_out.GetValue())
		maddr, err := ma.NewMultiaddr(addr)
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
200 201 202 203 204 205 206 207 208 209 210 211
		found_peer, err := s.Connect(maddr)
		if err != nil {
			u.POut("Found peer but couldnt connect.")
			return nil, err
		}

		if !found_peer.ID.Equal(id) {
			u.POut("FindPeer: searching for '%s' but found '%s'", id.Pretty(), found_peer.ID.Pretty())
			return found_peer, u.ErrSearchIncomplete
		}

		return found_peer, nil
212
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213
}
214 215 216 217 218 219

// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
	// Thoughts: maybe this should accept an ID and do a peer lookup?
	u.DOut("Enter Ping.")

220
	pmes := DHTMessage{Id: GenerateMessageID(), Type: PBDHTMessage_PING}
221 222 223
	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	before := time.Now()
224
	response_chan := dht.ListenFor(pmes.Id, 1, time.Minute)
225 226 227 228 229 230
	dht.network.Chan.Outgoing <- mes

	tout := time.After(timeout)
	select {
	case <-response_chan:
		roundtrip := time.Since(before)
231
		p.SetLatency(roundtrip)
232 233 234 235 236 237 238 239 240
		u.POut("Ping took %s.", roundtrip.String())
		return nil
	case <-tout:
		// Timed out, think about removing peer from network
		u.DOut("Ping peer timed out.")
		dht.Unlisten(pmes.Id)
		return u.ErrTimeout
	}
}
241 242 243 244 245 246 247

func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
	u.DOut("Begin Diagnostic")
	//Send to N closest peers
	targets := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10)

	// TODO: Add timeout to this struct so nodes know when to return
248 249 250
	pmes := DHTMessage{
		Type: PBDHTMessage_DIAGNOSTIC,
		Id:   GenerateMessageID(),
251 252
	}

253
	listen_chan := dht.ListenFor(pmes.Id, len(targets), time.Minute*2)
254 255

	pbmes := pmes.ToProtobuf()
256
	for _, p := range targets {
257
		mes := swarm.NewMessage(p, pbmes)
258
		dht.network.Chan.Outgoing <- mes
259 260 261 262 263 264 265 266 267 268
	}

	var out []*diagInfo
	after := time.After(timeout)
	for count := len(targets); count > 0; {
		select {
		case <-after:
			u.DOut("Diagnostic request timed out.")
			return out, u.ErrTimeout
		case resp := <-listen_chan:
269
			pmes_out := new(PBDHTMessage)
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
			err := proto.Unmarshal(resp.Data, pmes_out)
			if err != nil {
				// NOTE: here and elsewhere, need to audit error handling,
				//		some errors should be continued on from
				return out, err
			}

			dec := json.NewDecoder(bytes.NewBuffer(pmes_out.GetValue()))
			for {
				di := new(diagInfo)
				err := dec.Decode(di)
				if err != nil {
					break
				}

				out = append(out, di)
			}
		}
	}

290
	return nil, nil
291
}