routing.go 3.69 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4 5 6
	"math/rand"
	"time"

7 8
	proto "code.google.com/p/goprotobuf/proto"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9 10 11
	peer "github.com/jbenet/go-ipfs/peer"
	swarm "github.com/jbenet/go-ipfs/swarm"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12 13
)

14 15 16
// Pool size is the number of nodes used for group find/set RPC calls
var PoolSize = 6

17 18
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
19
	return uint64(rand.Uint32()) << 32 & uint64(rand.Uint32())
20 21
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22 23 24 25 26
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
28
	var p *peer.Peer
29
	p = s.routes.NearestPeer(convertKey(key))
30 31 32
	if p == nil {
		panic("Table returned nil peer!")
	}
33

34 35 36 37 38 39
	pmes := pDHTMessage{
		Type: DHTMessage_PUT_VALUE,
		Key: string(key),
		Value: value,
		Id: GenerateMessageID(),
	}
40

41
	mes := swarm.NewMessage(p, pmes.ToProtobuf())
42 43
	s.network.Chan.Outgoing <- mes
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44 45 46 47
}

// GetValue searches for the value corresponding to given Key.
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
48
	var p *peer.Peer
49
	p = s.routes.NearestPeer(convertKey(key))
50 51 52
	if p == nil {
		panic("Table returned nil peer!")
	}
53

54 55 56 57 58 59
	pmes := pDHTMessage{
		Type: DHTMessage_GET_VALUE,
		Key: string(key),
		Id: GenerateMessageID(),
	}
	response_chan := s.ListenFor(pmes.Id)
60

61 62
	mes := swarm.NewMessage(p, pmes.ToProtobuf())
	s.network.Chan.Outgoing <- mes
63

64
	// Wait for either the response or a timeout
65 66
	timeup := time.After(timeout)
	select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67
	case <-timeup:
68
		s.Unlisten(pmes.Id)
69
		return nil, u.ErrTimeout
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
	case resp := <-response_chan:
71 72 73 74 75 76
		pmes_out := new(DHTMessage)
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil,err
		}
		return pmes_out.GetValue(), nil
77
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78 79 80 81 82 83
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

// Announce that this node can provide value for given key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
84
func (s *IpfsDHT) Provide(key u.Key) error {
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
	peers := s.routes.NearestPeers(convertKey(key), PoolSize)
	if len(peers) == 0 {
		//return an error
	}

	pmes := pDHTMessage{
		Type: DHTMessage_ADD_PROVIDER,
		Key: string(key),
	}
	pbmes := pmes.ToProtobuf()

	for _,p := range peers {
		mes := swarm.NewMessage(p, pbmes)
		s.network.Chan.Outgoing <-mes
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101 102 103
}

// FindProviders searches for peers who can provide the value for given key.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
	p := s.routes.NearestPeer(convertKey(key))

	pmes := pDHTMessage{
		Type: DHTMessage_GET_PROVIDERS,
		Key: string(key),
		Id: GenerateMessageID(),
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	listen_chan := s.ListenFor(pmes.Id)
	s.network.Chan.Outgoing <-mes
	after := time.After(timeout)
	select {
	case <-after:
		s.Unlisten(pmes.Id)
		return nil, u.ErrTimeout
	case resp := <-listen_chan:
		pmes_out := new(DHTMessage)
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil, err
		}
		panic("Not yet implemented.")
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130 131 132 133 134 135
}

// Find specific Peer

// FindPeer searches for a peer with given ID.
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
	p := s.routes.NearestPeer(convertPeerID(id))

	pmes := pDHTMessage{
		Type: DHTMessage_FIND_NODE,
		Key: string(id),
		Id: GenerateMessageID(),
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	listen_chan := s.ListenFor(pmes.Id)
	s.network.Chan.Outgoing <-mes
	after := time.After(timeout)
	select {
	case <-after:
		s.Unlisten(pmes.Id)
		return nil, u.ErrTimeout
	case resp := <-listen_chan:
		pmes_out := new(DHTMessage)
		err := proto.Unmarshal(resp.Data, pmes_out)
		if err != nil {
			return nil, err
		}
		panic("Not yet implemented.")
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161
}