standard.go 4.91 KB
Newer Older
1 2 3
package proxy

import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4 5
	"errors"

6
	ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
7
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
8

Jeromy's avatar
Jeromy committed
9 10
	host "gx/ipfs/QmRW2xiYTpDLWTHb822ZYbPBoh3dGLJwaXLGS9tnPyWZpq/go-libp2p/p2p/host"
	inet "gx/ipfs/QmRW2xiYTpDLWTHb822ZYbPBoh3dGLJwaXLGS9tnPyWZpq/go-libp2p/p2p/net"
11
	logging "gx/ipfs/QmaDNZ4QMdBdku1YZWBysufYyoQt1negQGNav6PLYarbY8/go-log"
12
	peer "gx/ipfs/QmbyvM8zRFDkbFdYyt1MnevUMJ62SiSGbfDFZ3Z8nkrzr4/go-libp2p-peer"
13 14 15 16

	key "github.com/ipfs/go-ipfs/blocks/key"
	dhtpb "github.com/ipfs/go-ipfs/routing/dht/pb"
	kbucket "github.com/ipfs/go-ipfs/routing/kbucket"
17
	loggables "github.com/ipfs/go-ipfs/thirdparty/loggables"
18 19
)

20
const ProtocolSNR = "/ipfs/supernoderouting"
21

Jeromy's avatar
Jeromy committed
22
var log = logging.Logger("supernode/proxy")
Brian Tiger Chow's avatar
Brian Tiger Chow committed
23

24
type Proxy interface {
25
	Bootstrap(context.Context) error
26
	HandleStream(inet.Stream)
27 28 29 30 31
	SendMessage(ctx context.Context, m *dhtpb.Message) error
	SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error)
}

type standard struct {
32 33 34 35
	Host host.Host

	remoteInfos []peer.PeerInfo // addr required for bootstrapping
	remoteIDs   []peer.ID       // []ID is required for each req. here, cached for performance.
36 37
}

38
func Standard(h host.Host, remotes []peer.PeerInfo) Proxy {
39 40 41 42 43
	var ids []peer.ID
	for _, remote := range remotes {
		ids = append(ids, remote.ID)
	}
	return &standard{h, remotes, ids}
44 45
}

46
func (px *standard) Bootstrap(ctx context.Context) error {
47
	var cxns []peer.PeerInfo
48
	for _, info := range px.remoteInfos {
49
		if err := px.Host.Connect(ctx, info); err != nil {
50
			continue
51
		}
52 53 54
		cxns = append(cxns, info)
	}
	if len(cxns) == 0 {
rht's avatar
rht committed
55
		log.Error("unable to bootstrap to any supernode routers")
56
	} else {
Brian Tiger Chow's avatar
fix log  
Brian Tiger Chow committed
57
		log.Infof("bootstrapped to %d supernode routers: %s", len(cxns), cxns)
58
	}
59 60 61
	return nil
}

62
func (p *standard) HandleStream(s inet.Stream) {
63
	// TODO(brian): Should clients be able to satisfy requests?
64
	log.Error("supernode client received (dropped) a routing message from", s.Conn().RemotePeer())
65
	s.Close()
66
}
67

68 69
const replicationFactor = 2

Brian Tiger Chow's avatar
Brian Tiger Chow committed
70 71 72
// SendMessage sends message to each remote sequentially (randomized order),
// stopping after the first successful response. If all fail, returns the last
// error.
73
func (px *standard) SendMessage(ctx context.Context, m *dhtpb.Message) error {
74
	var err error
75
	var numSuccesses int
76
	for _, remote := range sortedByKey(px.remoteIDs, m.GetKey()) {
77 78 79
		if err = px.sendMessage(ctx, m, remote); err != nil { // careful don't re-declare err!
			continue
		}
80 81 82 83 84 85 86
		numSuccesses++
		switch m.GetType() {
		case dhtpb.Message_ADD_PROVIDER, dhtpb.Message_PUT_VALUE:
			if numSuccesses < replicationFactor {
				continue
			}
		}
87 88 89 90 91
		return nil // success
	}
	return err // NB: returns the last error
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
92 93 94 95 96 97 98 99 100
func (px *standard) sendMessage(ctx context.Context, m *dhtpb.Message, remote peer.ID) (err error) {
	e := log.EventBegin(ctx, "sendRoutingMessage", px.Host.ID(), remote, m)
	defer func() {
		if err != nil {
			e.SetError(err)
		}
		e.Done()
	}()
	if err = px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil {
101 102
		return err
	}
103
	s, err := px.Host.NewStream(ctx, ProtocolSNR, remote)
104 105 106 107 108 109
	if err != nil {
		return err
	}
	defer s.Close()
	pbw := ggio.NewDelimitedWriter(s)
	if err := pbw.WriteMsg(m); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
110
		return err
111 112 113 114
	}
	return nil
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
115 116 117
// SendRequest sends the request to each remote sequentially (randomized order),
// stopping after the first successful response. If all fail, returns the last
// error.
118
func (px *standard) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error) {
119
	var err error
120
	for _, remote := range sortedByKey(px.remoteIDs, m.GetKey()) {
121 122 123 124 125 126 127 128 129 130
		var reply *dhtpb.Message
		reply, err = px.sendRequest(ctx, m, remote) // careful don't redeclare err!
		if err != nil {
			continue
		}
		return reply, nil // success
	}
	return nil, err // NB: returns the last error
}

131
func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote peer.ID) (*dhtpb.Message, error) {
Jeromy's avatar
Jeromy committed
132
	e := log.EventBegin(ctx, "sendRoutingRequest", px.Host.ID(), remote, logging.Pair("request", m))
133 134 135
	defer e.Done()
	if err := px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil {
		e.SetError(err)
136 137
		return nil, err
	}
138
	s, err := px.Host.NewStream(ctx, ProtocolSNR, remote)
139
	if err != nil {
140
		e.SetError(err)
141 142 143 144 145
		return nil, err
	}
	defer s.Close()
	r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
	w := ggio.NewDelimitedWriter(s)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
146
	if err = w.WriteMsg(m); err != nil {
147
		e.SetError(err)
148 149 150
		return nil, err
	}

151 152 153
	response := &dhtpb.Message{}
	if err = r.ReadMsg(response); err != nil {
		e.SetError(err)
154 155 156
		return nil, err
	}
	// need ctx expiration?
157 158 159 160
	if response == nil {
		err := errors.New("no response to request")
		e.SetError(err)
		return nil, err
161
	}
Jeromy's avatar
Jeromy committed
162
	e.Append(logging.Pair("response", response))
163
	e.Append(logging.Pair("uuid", loggables.Uuid("foo")))
164
	return response, nil
165
}
166

167 168
func sortedByKey(peers []peer.ID, skey string) []peer.ID {
	target := kbucket.ConvertKey(key.Key(skey))
169 170
	return kbucket.SortClosestPeers(peers, target)
}