routing.go 6.87 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
5
	"encoding/json"
6
	"errors"
Jeromy's avatar
Jeromy committed
7
	"fmt"
8 9
	"math/rand"
	"time"
10

11 12
	proto "code.google.com/p/goprotobuf/proto"

13 14
	ma "github.com/jbenet/go-multiaddr"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	peer "github.com/jbenet/go-ipfs/peer"
16
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18
	swarm "github.com/jbenet/go-ipfs/swarm"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20
)

21 22 23
// Pool size is the number of nodes used for group find/set RPC calls
var PoolSize = 6

24 25
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
26 27
	//return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32())
	return uint64(rand.Uint32())
28 29
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30 31 32 33 34
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
35
// This is the top level "Store" operation of the DHT
Jeromy's avatar
Jeromy committed
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
func (s *IpfsDHT) PutValue(key u.Key, value []byte) {
	complete := make(chan struct{})
	for i, route := range s.routes {
		p := route.NearestPeer(kb.ConvertKey(key))
		if p == nil {
			s.network.Chan.Errors <- fmt.Errorf("No peer found on level %d", i)
			continue
			go func() {
				complete <- struct{}{}
			}()
		}
		go func() {
			err := s.putValueToNetwork(p, string(key), value)
			if err != nil {
				s.network.Chan.Errors <- err
			}
			complete <- struct{}{}
		}()
	}
	for _, _ = range s.routes {
		<-complete
57
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58 59 60
}

// GetValue searches for the value corresponding to given Key.
Jeromy's avatar
Jeromy committed
61 62
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
Jeromy's avatar
Jeromy committed
64 65 66 67 68 69
	for _, route := range s.routes {
		var p *peer.Peer
		p = route.NearestPeer(kb.ConvertKey(key))
		if p == nil {
			return nil, errors.New("Table returned nil peer!")
		}
70

Jeromy's avatar
Jeromy committed
71 72 73
		b, err := s.getValueSingle(p, key, timeout)
		if err == nil {
			return b, nil
74
		}
Jeromy's avatar
Jeromy committed
75
		if err != u.ErrSearchIncomplete {
76
			return nil, err
77
		}
78
	}
Jeromy's avatar
Jeromy committed
79
	return nil, u.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80 81 82 83 84 85
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

// Announce that this node can provide value for given key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
func (s *IpfsDHT) Provide(key u.Key) error {
87
	peers := s.routes[0].NearestPeers(kb.ConvertKey(key), PoolSize)
88 89 90 91
	if len(peers) == 0 {
		//return an error
	}

92 93 94
	pmes := DHTMessage{
		Type: PBDHTMessage_ADD_PROVIDER,
		Key:  string(key),
95 96 97
	}
	pbmes := pmes.ToProtobuf()

98
	for _, p := range peers {
99
		mes := swarm.NewMessage(p, pbmes)
100
		s.network.Chan.Outgoing <- mes
101 102
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
103 104 105
}

// FindProviders searches for peers who can provide the value for given key.
106
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
107
	p := s.routes[0].NearestPeer(kb.ConvertKey(key))
108

109 110 111 112
	pmes := DHTMessage{
		Type: PBDHTMessage_GET_PROVIDERS,
		Key:  string(key),
		Id:   GenerateMessageID(),
113 114 115 116
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

Jeromy's avatar
Jeromy committed
117
	listenChan := s.ListenFor(pmes.Id, 1, time.Minute)
118
	u.DOut("Find providers for: '%s'", key)
119
	s.network.Chan.Outgoing <- mes
120 121 122 123 124
	after := time.After(timeout)
	select {
	case <-after:
		s.Unlisten(pmes.Id)
		return nil, u.ErrTimeout
Jeromy's avatar
Jeromy committed
125
	case resp := <-listenChan:
126
		u.DOut("FindProviders: got response.")
127
		pmes_out := new(PBDHTMessage)
128 129 130 131
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil, err
		}
132 133
		var addrs map[u.Key]string
		err = json.Unmarshal(pmes_out.GetValue(), &addrs)
134 135 136 137
		if err != nil {
			return nil, err
		}

138
		var prov_arr []*peer.Peer
139
		for pid, addr := range addrs {
140
			p := s.network.Find(pid)
141
			if p == nil {
142
				maddr, err := ma.NewMultiaddr(addr)
143 144 145 146
				if err != nil {
					u.PErr("error connecting to new peer: %s", err)
					continue
				}
147
				p, err = s.Connect(maddr)
148 149 150 151 152
				if err != nil {
					u.PErr("error connecting to new peer: %s", err)
					continue
				}
			}
153 154
			s.addProviderEntry(key, p)
			prov_arr = append(prov_arr, p)
155 156
		}

157
		return prov_arr, nil
158
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159 160 161 162 163 164
}

// Find specific Peer

// FindPeer searches for a peer with given ID.
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
165
	p := s.routes[0].NearestPeer(kb.ConvertPeerID(id))
166

167 168 169 170
	pmes := DHTMessage{
		Type: PBDHTMessage_FIND_NODE,
		Key:  string(id),
		Id:   GenerateMessageID(),
171 172 173 174
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

Jeromy's avatar
Jeromy committed
175
	listenChan := s.ListenFor(pmes.Id, 1, time.Minute)
176
	s.network.Chan.Outgoing <- mes
177 178 179 180 181
	after := time.After(timeout)
	select {
	case <-after:
		s.Unlisten(pmes.Id)
		return nil, u.ErrTimeout
Jeromy's avatar
Jeromy committed
182
	case resp := <-listenChan:
183
		pmes_out := new(PBDHTMessage)
184 185 186 187
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil, err
		}
Jeromy's avatar
Jeromy committed
188 189 190 191 192 193
		addr := string(pmes_out.GetValue())
		maddr, err := ma.NewMultiaddr(addr)
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
194 195 196 197 198 199 200 201 202 203 204 205
		found_peer, err := s.Connect(maddr)
		if err != nil {
			u.POut("Found peer but couldnt connect.")
			return nil, err
		}

		if !found_peer.ID.Equal(id) {
			u.POut("FindPeer: searching for '%s' but found '%s'", id.Pretty(), found_peer.ID.Pretty())
			return found_peer, u.ErrSearchIncomplete
		}

		return found_peer, nil
206
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207
}
208 209 210 211 212 213

// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
	// Thoughts: maybe this should accept an ID and do a peer lookup?
	u.DOut("Enter Ping.")

214
	pmes := DHTMessage{Id: GenerateMessageID(), Type: PBDHTMessage_PING}
215 216 217
	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	before := time.Now()
218
	response_chan := dht.ListenFor(pmes.Id, 1, time.Minute)
219 220 221 222 223 224
	dht.network.Chan.Outgoing <- mes

	tout := time.After(timeout)
	select {
	case <-response_chan:
		roundtrip := time.Since(before)
225
		p.SetLatency(roundtrip)
226 227 228 229 230 231 232 233 234
		u.POut("Ping took %s.", roundtrip.String())
		return nil
	case <-tout:
		// Timed out, think about removing peer from network
		u.DOut("Ping peer timed out.")
		dht.Unlisten(pmes.Id)
		return u.ErrTimeout
	}
}
235 236 237 238

func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
	u.DOut("Begin Diagnostic")
	//Send to N closest peers
239
	targets := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
240 241

	// TODO: Add timeout to this struct so nodes know when to return
242 243 244
	pmes := DHTMessage{
		Type: PBDHTMessage_DIAGNOSTIC,
		Id:   GenerateMessageID(),
245 246
	}

Jeromy's avatar
Jeromy committed
247
	listenChan := dht.ListenFor(pmes.Id, len(targets), time.Minute*2)
248 249

	pbmes := pmes.ToProtobuf()
250
	for _, p := range targets {
251
		mes := swarm.NewMessage(p, pbmes)
252
		dht.network.Chan.Outgoing <- mes
253 254 255 256 257 258 259 260 261
	}

	var out []*diagInfo
	after := time.After(timeout)
	for count := len(targets); count > 0; {
		select {
		case <-after:
			u.DOut("Diagnostic request timed out.")
			return out, u.ErrTimeout
Jeromy's avatar
Jeromy committed
262
		case resp := <-listenChan:
263
			pmes_out := new(PBDHTMessage)
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
			err := proto.Unmarshal(resp.Data, pmes_out)
			if err != nil {
				// NOTE: here and elsewhere, need to audit error handling,
				//		some errors should be continued on from
				return out, err
			}

			dec := json.NewDecoder(bytes.NewBuffer(pmes_out.GetValue()))
			for {
				di := new(diagInfo)
				err := dec.Decode(di)
				if err != nil {
					break
				}

				out = append(out, di)
			}
		}
	}

284
	return nil, nil
285
}