routing.go 5.56 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4 5
	"math/rand"
	"time"
6
	"encoding/json"
7

8 9
	proto "code.google.com/p/goprotobuf/proto"

10 11
	ma "github.com/jbenet/go-multiaddr"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12 13 14
	peer "github.com/jbenet/go-ipfs/peer"
	swarm "github.com/jbenet/go-ipfs/swarm"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
)

17 18 19
// Pool size is the number of nodes used for group find/set RPC calls
var PoolSize = 6

20 21
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
22 23
	//return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32())
	return uint64(rand.Uint32())
24 25
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26 27 28 29 30
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
32
	var p *peer.Peer
33
	p = s.routes.NearestPeer(convertKey(key))
34 35 36
	if p == nil {
		panic("Table returned nil peer!")
	}
37

38 39 40 41 42 43
	pmes := pDHTMessage{
		Type: DHTMessage_PUT_VALUE,
		Key: string(key),
		Value: value,
		Id: GenerateMessageID(),
	}
44

45
	mes := swarm.NewMessage(p, pmes.ToProtobuf())
46 47
	s.network.Chan.Outgoing <- mes
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
48 49 50 51
}

// GetValue searches for the value corresponding to given Key.
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
52
	var p *peer.Peer
53
	p = s.routes.NearestPeer(convertKey(key))
54 55 56
	if p == nil {
		panic("Table returned nil peer!")
	}
57

58 59 60 61 62 63
	pmes := pDHTMessage{
		Type: DHTMessage_GET_VALUE,
		Key: string(key),
		Id: GenerateMessageID(),
	}
	response_chan := s.ListenFor(pmes.Id)
64

65 66
	mes := swarm.NewMessage(p, pmes.ToProtobuf())
	s.network.Chan.Outgoing <- mes
67

68
	// Wait for either the response or a timeout
69 70
	timeup := time.After(timeout)
	select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
	case <-timeup:
72
		s.Unlisten(pmes.Id)
73
		return nil, u.ErrTimeout
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74
	case resp := <-response_chan:
75 76 77 78 79 80
		pmes_out := new(DHTMessage)
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil,err
		}
		return pmes_out.GetValue(), nil
81
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82 83 84 85 86 87
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

// Announce that this node can provide value for given key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88
func (s *IpfsDHT) Provide(key u.Key) error {
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
	peers := s.routes.NearestPeers(convertKey(key), PoolSize)
	if len(peers) == 0 {
		//return an error
	}

	pmes := pDHTMessage{
		Type: DHTMessage_ADD_PROVIDER,
		Key: string(key),
	}
	pbmes := pmes.ToProtobuf()

	for _,p := range peers {
		mes := swarm.NewMessage(p, pbmes)
		s.network.Chan.Outgoing <-mes
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105 106 107
}

// FindProviders searches for peers who can provide the value for given key.
108 109 110 111 112 113 114 115 116 117 118 119
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
	p := s.routes.NearestPeer(convertKey(key))

	pmes := pDHTMessage{
		Type: DHTMessage_GET_PROVIDERS,
		Key: string(key),
		Id: GenerateMessageID(),
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	listen_chan := s.ListenFor(pmes.Id)
120
	u.DOut("Find providers for: '%s'", key)
121 122 123 124 125 126 127
	s.network.Chan.Outgoing <-mes
	after := time.After(timeout)
	select {
	case <-after:
		s.Unlisten(pmes.Id)
		return nil, u.ErrTimeout
	case resp := <-listen_chan:
128
		u.DOut("FindProviders: got response.")
129 130 131 132 133
		pmes_out := new(DHTMessage)
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil, err
		}
134 135
		var addrs map[u.Key]string
		err = json.Unmarshal(pmes_out.GetValue(), &addrs)
136 137 138 139
		if err != nil {
			return nil, err
		}

140 141 142
		var prov_arr []*peer.Peer
		for pid,addr := range addrs {
			p := s.network.Find(pid)
143 144 145 146 147 148
			if p == nil {
				maddr,err := ma.NewMultiaddr(addr)
				if err != nil {
					u.PErr("error connecting to new peer: %s", err)
					continue
				}
149
				p, err = s.Connect(maddr)
150 151 152 153 154
				if err != nil {
					u.PErr("error connecting to new peer: %s", err)
					continue
				}
			}
155 156
			s.addProviderEntry(key, p)
			prov_arr = append(prov_arr, p)
157 158
		}

159 160
		return prov_arr, nil

161
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162 163 164 165 166 167
}

// Find specific Peer

// FindPeer searches for a peer with given ID.
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
	p := s.routes.NearestPeer(convertPeerID(id))

	pmes := pDHTMessage{
		Type: DHTMessage_FIND_NODE,
		Key: string(id),
		Id: GenerateMessageID(),
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	listen_chan := s.ListenFor(pmes.Id)
	s.network.Chan.Outgoing <-mes
	after := time.After(timeout)
	select {
	case <-after:
		s.Unlisten(pmes.Id)
		return nil, u.ErrTimeout
	case resp := <-listen_chan:
		pmes_out := new(DHTMessage)
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil, err
		}
Jeromy's avatar
Jeromy committed
191 192 193 194 195 196
		addr := string(pmes_out.GetValue())
		maddr, err := ma.NewMultiaddr(addr)
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
197 198 199 200 201 202 203 204 205 206 207 208
		found_peer, err := s.Connect(maddr)
		if err != nil {
			u.POut("Found peer but couldnt connect.")
			return nil, err
		}

		if !found_peer.ID.Equal(id) {
			u.POut("FindPeer: searching for '%s' but found '%s'", id.Pretty(), found_peer.ID.Pretty())
			return found_peer, u.ErrSearchIncomplete
		}

		return found_peer, nil
209
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
210
}
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237

// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
	// Thoughts: maybe this should accept an ID and do a peer lookup?
	u.DOut("Enter Ping.")

	pmes := pDHTMessage{Id: GenerateMessageID(), Type: DHTMessage_PING}
	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	before := time.Now()
	response_chan := dht.ListenFor(pmes.Id)
	dht.network.Chan.Outgoing <- mes

	tout := time.After(timeout)
	select {
	case <-response_chan:
		roundtrip := time.Since(before)
		p.Distance = roundtrip //TODO: This isnt threadsafe
		u.POut("Ping took %s.", roundtrip.String())
		return nil
	case <-tout:
		// Timed out, think about removing peer from network
		u.DOut("Ping peer timed out.")
		dht.Unlisten(pmes.Id)
		return u.ErrTimeout
	}
}