routing.go 4.3 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4 5
	"math/rand"
	"time"
6
	"encoding/json"
7

8 9
	proto "code.google.com/p/goprotobuf/proto"

10 11
	ma "github.com/jbenet/go-multiaddr"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12 13 14
	peer "github.com/jbenet/go-ipfs/peer"
	swarm "github.com/jbenet/go-ipfs/swarm"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
)

17 18 19
// Pool size is the number of nodes used for group find/set RPC calls
var PoolSize = 6

20 21
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
22
	return uint64(rand.Uint32()) << 32 & uint64(rand.Uint32())
23 24
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25 26 27 28 29
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
31
	var p *peer.Peer
32
	p = s.routes.NearestPeer(convertKey(key))
33 34 35
	if p == nil {
		panic("Table returned nil peer!")
	}
36

37 38 39 40 41 42
	pmes := pDHTMessage{
		Type: DHTMessage_PUT_VALUE,
		Key: string(key),
		Value: value,
		Id: GenerateMessageID(),
	}
43

44
	mes := swarm.NewMessage(p, pmes.ToProtobuf())
45 46
	s.network.Chan.Outgoing <- mes
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47 48 49 50
}

// GetValue searches for the value corresponding to given Key.
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
51
	var p *peer.Peer
52
	p = s.routes.NearestPeer(convertKey(key))
53 54 55
	if p == nil {
		panic("Table returned nil peer!")
	}
56

57 58 59 60 61 62
	pmes := pDHTMessage{
		Type: DHTMessage_GET_VALUE,
		Key: string(key),
		Id: GenerateMessageID(),
	}
	response_chan := s.ListenFor(pmes.Id)
63

64 65
	mes := swarm.NewMessage(p, pmes.ToProtobuf())
	s.network.Chan.Outgoing <- mes
66

67
	// Wait for either the response or a timeout
68 69
	timeup := time.After(timeout)
	select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
	case <-timeup:
71
		s.Unlisten(pmes.Id)
72
		return nil, u.ErrTimeout
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
	case resp := <-response_chan:
74 75 76 77 78 79
		pmes_out := new(DHTMessage)
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil,err
		}
		return pmes_out.GetValue(), nil
80
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81 82 83 84 85 86
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

// Announce that this node can provide value for given key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87
func (s *IpfsDHT) Provide(key u.Key) error {
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
	peers := s.routes.NearestPeers(convertKey(key), PoolSize)
	if len(peers) == 0 {
		//return an error
	}

	pmes := pDHTMessage{
		Type: DHTMessage_ADD_PROVIDER,
		Key: string(key),
	}
	pbmes := pmes.ToProtobuf()

	for _,p := range peers {
		mes := swarm.NewMessage(p, pbmes)
		s.network.Chan.Outgoing <-mes
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105 106
}

// FindProviders searches for peers who can provide the value for given key.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
	p := s.routes.NearestPeer(convertKey(key))

	pmes := pDHTMessage{
		Type: DHTMessage_GET_PROVIDERS,
		Key: string(key),
		Id: GenerateMessageID(),
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	listen_chan := s.ListenFor(pmes.Id)
	s.network.Chan.Outgoing <-mes
	after := time.After(timeout)
	select {
	case <-after:
		s.Unlisten(pmes.Id)
		return nil, u.ErrTimeout
	case resp := <-listen_chan:
		pmes_out := new(DHTMessage)
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil, err
		}
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
		var addrs map[string]string
		err := json.Unmarshal(pmes_out.GetValue(), &addrs)
		if err != nil {
			return nil, err
		}

		for key,addr := range addrs {
			p := s.network.Find(u.Key(key))
			if p == nil {
				maddr,err := ma.NewMultiaddr(addr)
				if err != nil {
					u.PErr("error connecting to new peer: %s", err)
					continue
				}
				p, err := s.Connect(maddr)
				if err != nil {
					u.PErr("error connecting to new peer: %s", err)
					continue
				}
			}
			s.providerLock.Lock()
			prov_arr := s.providers[key]
			s.providers[key] = append(prov_arr, p)
			s.providerLock.Unlock()
		}

157
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158 159 160 161 162 163
}

// Find specific Peer

// FindPeer searches for a peer with given ID.
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
	p := s.routes.NearestPeer(convertPeerID(id))

	pmes := pDHTMessage{
		Type: DHTMessage_FIND_NODE,
		Key: string(id),
		Id: GenerateMessageID(),
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	listen_chan := s.ListenFor(pmes.Id)
	s.network.Chan.Outgoing <-mes
	after := time.After(timeout)
	select {
	case <-after:
		s.Unlisten(pmes.Id)
		return nil, u.ErrTimeout
	case resp := <-listen_chan:
		pmes_out := new(DHTMessage)
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil, err
		}
		panic("Not yet implemented.")
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
189
}