routing.go 5.74 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4 5
	"math/rand"
	"time"
6
	"encoding/json"
7

8 9
	proto "code.google.com/p/goprotobuf/proto"

10 11
	ma "github.com/jbenet/go-multiaddr"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12 13 14
	peer "github.com/jbenet/go-ipfs/peer"
	swarm "github.com/jbenet/go-ipfs/swarm"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
)

17 18 19
// Pool size is the number of nodes used for group find/set RPC calls
var PoolSize = 6

20 21
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
22 23
	//return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32())
	return uint64(rand.Uint32())
24 25
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26 27 28 29 30
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
32
	var p *peer.Peer
33
	p = s.routes.NearestPeer(convertKey(key))
34 35 36
	if p == nil {
		panic("Table returned nil peer!")
	}
37

38 39 40 41 42 43
	pmes := pDHTMessage{
		Type: DHTMessage_PUT_VALUE,
		Key: string(key),
		Value: value,
		Id: GenerateMessageID(),
	}
44

45
	mes := swarm.NewMessage(p, pmes.ToProtobuf())
46 47
	s.network.Chan.Outgoing <- mes
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
48 49 50
}

// GetValue searches for the value corresponding to given Key.
Jeromy's avatar
Jeromy committed
51 52
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
53
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
54
	var p *peer.Peer
55
	p = s.routes.NearestPeer(convertKey(key))
56 57 58
	if p == nil {
		panic("Table returned nil peer!")
	}
59

60 61 62 63 64 65
	pmes := pDHTMessage{
		Type: DHTMessage_GET_VALUE,
		Key: string(key),
		Id: GenerateMessageID(),
	}
	response_chan := s.ListenFor(pmes.Id)
66

67 68
	mes := swarm.NewMessage(p, pmes.ToProtobuf())
	s.network.Chan.Outgoing <- mes
69

70
	// Wait for either the response or a timeout
71 72
	timeup := time.After(timeout)
	select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
	case <-timeup:
74
		s.Unlisten(pmes.Id)
75
		return nil, u.ErrTimeout
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
76
	case resp := <-response_chan:
77 78 79 80 81
		pmes_out := new(DHTMessage)
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil,err
		}
Jeromy's avatar
Jeromy committed
82 83 84 85 86
		if pmes_out.GetSuccess() {
			return pmes_out.GetValue(), nil
		} else {
			return pmes_out.GetValue(), u.ErrSearchIncomplete
		}
87
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88 89 90 91 92 93
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

// Announce that this node can provide value for given key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
94
func (s *IpfsDHT) Provide(key u.Key) error {
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
	peers := s.routes.NearestPeers(convertKey(key), PoolSize)
	if len(peers) == 0 {
		//return an error
	}

	pmes := pDHTMessage{
		Type: DHTMessage_ADD_PROVIDER,
		Key: string(key),
	}
	pbmes := pmes.ToProtobuf()

	for _,p := range peers {
		mes := swarm.NewMessage(p, pbmes)
		s.network.Chan.Outgoing <-mes
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
111 112 113
}

// FindProviders searches for peers who can provide the value for given key.
114 115 116 117 118 119 120 121 122 123 124 125
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
	p := s.routes.NearestPeer(convertKey(key))

	pmes := pDHTMessage{
		Type: DHTMessage_GET_PROVIDERS,
		Key: string(key),
		Id: GenerateMessageID(),
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	listen_chan := s.ListenFor(pmes.Id)
126
	u.DOut("Find providers for: '%s'", key)
127 128 129 130 131 132 133
	s.network.Chan.Outgoing <-mes
	after := time.After(timeout)
	select {
	case <-after:
		s.Unlisten(pmes.Id)
		return nil, u.ErrTimeout
	case resp := <-listen_chan:
134
		u.DOut("FindProviders: got response.")
135 136 137 138 139
		pmes_out := new(DHTMessage)
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil, err
		}
140 141
		var addrs map[u.Key]string
		err = json.Unmarshal(pmes_out.GetValue(), &addrs)
142 143 144 145
		if err != nil {
			return nil, err
		}

146 147 148
		var prov_arr []*peer.Peer
		for pid,addr := range addrs {
			p := s.network.Find(pid)
149 150 151 152 153 154
			if p == nil {
				maddr,err := ma.NewMultiaddr(addr)
				if err != nil {
					u.PErr("error connecting to new peer: %s", err)
					continue
				}
155
				p, err = s.Connect(maddr)
156 157 158 159 160
				if err != nil {
					u.PErr("error connecting to new peer: %s", err)
					continue
				}
			}
161 162
			s.addProviderEntry(key, p)
			prov_arr = append(prov_arr, p)
163 164
		}

165 166
		return prov_arr, nil

167
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
168 169 170 171 172 173
}

// Find specific Peer

// FindPeer searches for a peer with given ID.
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
	p := s.routes.NearestPeer(convertPeerID(id))

	pmes := pDHTMessage{
		Type: DHTMessage_FIND_NODE,
		Key: string(id),
		Id: GenerateMessageID(),
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	listen_chan := s.ListenFor(pmes.Id)
	s.network.Chan.Outgoing <-mes
	after := time.After(timeout)
	select {
	case <-after:
		s.Unlisten(pmes.Id)
		return nil, u.ErrTimeout
	case resp := <-listen_chan:
		pmes_out := new(DHTMessage)
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil, err
		}
Jeromy's avatar
Jeromy committed
197 198 199 200 201 202
		addr := string(pmes_out.GetValue())
		maddr, err := ma.NewMultiaddr(addr)
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
203 204 205 206 207 208 209 210 211 212 213 214
		found_peer, err := s.Connect(maddr)
		if err != nil {
			u.POut("Found peer but couldnt connect.")
			return nil, err
		}

		if !found_peer.ID.Equal(id) {
			u.POut("FindPeer: searching for '%s' but found '%s'", id.Pretty(), found_peer.ID.Pretty())
			return found_peer, u.ErrSearchIncomplete
		}

		return found_peer, nil
215
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
216
}
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233

// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
	// Thoughts: maybe this should accept an ID and do a peer lookup?
	u.DOut("Enter Ping.")

	pmes := pDHTMessage{Id: GenerateMessageID(), Type: DHTMessage_PING}
	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	before := time.Now()
	response_chan := dht.ListenFor(pmes.Id)
	dht.network.Chan.Outgoing <- mes

	tout := time.After(timeout)
	select {
	case <-response_chan:
		roundtrip := time.Since(before)
Jeromy's avatar
Jeromy committed
234
		p.SetDistance(roundtrip)
235 236 237 238 239 240 241 242 243
		u.POut("Ping took %s.", roundtrip.String())
		return nil
	case <-tout:
		// Timed out, think about removing peer from network
		u.DOut("Ping peer timed out.")
		dht.Unlisten(pmes.Id)
		return u.ErrTimeout
	}
}