standard.go 4.97 KB
Newer Older
1 2 3
package proxy

import (
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6
	"errors"

7 8
	dhtpb "gx/ipfs/QmRmroYSdievxnjiuy99C8BzShNstdEWcEF3LQHF7fUbez/go-libp2p-kad-dht/pb"
	inet "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net"
Jeromy's avatar
Jeromy committed
9
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
10
	host "gx/ipfs/QmUywuGNZoUKV8B9iyvup9bPkLiMrhTsyVMkeSXW5VxAfC/go-libp2p-host"
11
	loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables"
12
	pstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore"
13
	ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
14
	kbucket "gx/ipfs/QmaQG6fJdzn2532WHoPdVwKqftXr6iCSr5NtWyGi1BHytT/go-libp2p-kbucket"
15
	peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
16 17
)

18
const ProtocolSNR = "/ipfs/supernoderouting"
19

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("supernode/proxy")
Brian Tiger Chow's avatar
Brian Tiger Chow committed
21

22
type Proxy interface {
23
	Bootstrap(context.Context) error
24
	HandleStream(inet.Stream)
25 26 27 28 29
	SendMessage(ctx context.Context, m *dhtpb.Message) error
	SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error)
}

type standard struct {
30 31
	Host host.Host

Jeromy's avatar
Jeromy committed
32 33
	remoteInfos []pstore.PeerInfo // addr required for bootstrapping
	remoteIDs   []peer.ID         // []ID is required for each req. here, cached for performance.
34 35
}

Jeromy's avatar
Jeromy committed
36
func Standard(h host.Host, remotes []pstore.PeerInfo) Proxy {
37 38 39 40 41
	var ids []peer.ID
	for _, remote := range remotes {
		ids = append(ids, remote.ID)
	}
	return &standard{h, remotes, ids}
42 43
}

44
func (px *standard) Bootstrap(ctx context.Context) error {
Jeromy's avatar
Jeromy committed
45
	var cxns []pstore.PeerInfo
46
	for _, info := range px.remoteInfos {
47
		if err := px.Host.Connect(ctx, info); err != nil {
48
			continue
49
		}
50 51 52
		cxns = append(cxns, info)
	}
	if len(cxns) == 0 {
rht's avatar
rht committed
53
		log.Error("unable to bootstrap to any supernode routers")
54
	} else {
Brian Tiger Chow's avatar
fix log  
Brian Tiger Chow committed
55
		log.Infof("bootstrapped to %d supernode routers: %s", len(cxns), cxns)
56
	}
57 58 59
	return nil
}

60
func (p *standard) HandleStream(s inet.Stream) {
61
	// TODO(brian): Should clients be able to satisfy requests?
62
	log.Error("supernode client received (dropped) a routing message from", s.Conn().RemotePeer())
63
	s.Close()
64
}
65

66 67
const replicationFactor = 2

Brian Tiger Chow's avatar
Brian Tiger Chow committed
68 69 70
// SendMessage sends message to each remote sequentially (randomized order),
// stopping after the first successful response. If all fail, returns the last
// error.
71
func (px *standard) SendMessage(ctx context.Context, m *dhtpb.Message) error {
72
	var err error
73
	var numSuccesses int
74
	for _, remote := range sortedByKey(px.remoteIDs, m.GetKey()) {
75 76 77
		if err = px.sendMessage(ctx, m, remote); err != nil { // careful don't re-declare err!
			continue
		}
78 79 80 81 82 83 84
		numSuccesses++
		switch m.GetType() {
		case dhtpb.Message_ADD_PROVIDER, dhtpb.Message_PUT_VALUE:
			if numSuccesses < replicationFactor {
				continue
			}
		}
85 86 87 88 89
		return nil // success
	}
	return err // NB: returns the last error
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
90 91 92 93 94 95 96 97
func (px *standard) sendMessage(ctx context.Context, m *dhtpb.Message, remote peer.ID) (err error) {
	e := log.EventBegin(ctx, "sendRoutingMessage", px.Host.ID(), remote, m)
	defer func() {
		if err != nil {
			e.SetError(err)
		}
		e.Done()
	}()
Jeromy's avatar
Jeromy committed
98
	if err = px.Host.Connect(ctx, pstore.PeerInfo{ID: remote}); err != nil {
99 100
		return err
	}
Jeromy's avatar
Jeromy committed
101
	s, err := px.Host.NewStream(ctx, remote, ProtocolSNR)
102 103 104 105 106 107
	if err != nil {
		return err
	}
	defer s.Close()
	pbw := ggio.NewDelimitedWriter(s)
	if err := pbw.WriteMsg(m); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
108
		return err
109 110 111 112
	}
	return nil
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
113 114 115
// SendRequest sends the request to each remote sequentially (randomized order),
// stopping after the first successful response. If all fail, returns the last
// error.
116
func (px *standard) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error) {
117
	var err error
118
	for _, remote := range sortedByKey(px.remoteIDs, m.GetKey()) {
119 120 121 122 123 124 125 126 127 128
		var reply *dhtpb.Message
		reply, err = px.sendRequest(ctx, m, remote) // careful don't redeclare err!
		if err != nil {
			continue
		}
		return reply, nil // success
	}
	return nil, err // NB: returns the last error
}

129
func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote peer.ID) (*dhtpb.Message, error) {
Jeromy's avatar
Jeromy committed
130
	e := log.EventBegin(ctx, "sendRoutingRequest", px.Host.ID(), remote, logging.Pair("request", m))
131
	defer e.Done()
Jeromy's avatar
Jeromy committed
132
	if err := px.Host.Connect(ctx, pstore.PeerInfo{ID: remote}); err != nil {
133
		e.SetError(err)
134 135
		return nil, err
	}
Jeromy's avatar
Jeromy committed
136
	s, err := px.Host.NewStream(ctx, remote, ProtocolSNR)
137
	if err != nil {
138
		e.SetError(err)
139 140 141 142 143
		return nil, err
	}
	defer s.Close()
	r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
	w := ggio.NewDelimitedWriter(s)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
144
	if err = w.WriteMsg(m); err != nil {
145
		e.SetError(err)
146 147 148
		return nil, err
	}

149 150 151
	response := &dhtpb.Message{}
	if err = r.ReadMsg(response); err != nil {
		e.SetError(err)
152 153 154
		return nil, err
	}
	// need ctx expiration?
155 156 157 158
	if response == nil {
		err := errors.New("no response to request")
		e.SetError(err)
		return nil, err
159
	}
Jeromy's avatar
Jeromy committed
160
	e.Append(logging.Pair("response", response))
161
	e.Append(logging.Pair("uuid", loggables.Uuid("foo")))
162
	return response, nil
163
}
164

165
func sortedByKey(peers []peer.ID, skey string) []peer.ID {
166
	target := kbucket.ConvertKey(skey)
167 168
	return kbucket.SortClosestPeers(peers, target)
}