standard.go 3.72 KB
Newer Older
1 2 3
package proxy

import (
4 5
	"math/rand"

6 7
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
8
	host "github.com/jbenet/go-ipfs/p2p/host"
9 10 11 12
	inet "github.com/jbenet/go-ipfs/p2p/net"
	peer "github.com/jbenet/go-ipfs/p2p/peer"
	dhtpb "github.com/jbenet/go-ipfs/routing/dht/pb"
	eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
13
	errors "github.com/jbenet/go-ipfs/util/debugerror"
14 15
)

16
const ProtocolSNR = "/ipfs/supernoderouting"
17

18
var log = eventlog.Logger("supernode/proxy")
Brian Tiger Chow's avatar
Brian Tiger Chow committed
19

20
type Proxy interface {
21
	HandleStream(inet.Stream)
22 23 24 25 26
	SendMessage(ctx context.Context, m *dhtpb.Message) error
	SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error)
}

type standard struct {
27 28
	Host    host.Host
	Remotes []peer.ID
29 30
}

31 32
func Standard(h host.Host, remotes []peer.ID) Proxy {
	return &standard{h, remotes}
33 34
}

35
func (p *standard) HandleStream(s inet.Stream) {
36
	// TODO(brian): Should clients be able to satisfy requests?
37
	log.Error("supernode client received (dropped) a routing message from", s.Conn().RemotePeer())
38
	s.Close()
39
}
40

Brian Tiger Chow's avatar
Brian Tiger Chow committed
41 42 43
// SendMessage sends message to each remote sequentially (randomized order),
// stopping after the first successful response. If all fail, returns the last
// error.
44
func (px *standard) SendMessage(ctx context.Context, m *dhtpb.Message) error {
45
	var err error
46 47
	for _, i := range rand.Perm(len(px.Remotes)) {
		remote := px.Remotes[i]
48 49 50 51 52 53 54 55
		if err = px.sendMessage(ctx, m, remote); err != nil { // careful don't re-declare err!
			continue
		}
		return nil // success
	}
	return err // NB: returns the last error
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
56 57 58 59 60 61 62 63 64
func (px *standard) sendMessage(ctx context.Context, m *dhtpb.Message, remote peer.ID) (err error) {
	e := log.EventBegin(ctx, "sendRoutingMessage", px.Host.ID(), remote, m)
	defer func() {
		if err != nil {
			e.SetError(err)
		}
		e.Done()
	}()
	if err = px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil {
65 66
		return err
	}
67
	s, err := px.Host.NewStream(ProtocolSNR, remote)
68 69 70 71 72 73 74 75 76 77 78
	if err != nil {
		return err
	}
	defer s.Close()
	pbw := ggio.NewDelimitedWriter(s)
	if err := pbw.WriteMsg(m); err != nil {
		return errors.Wrap(err)
	}
	return nil
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
79 80 81
// SendRequest sends the request to each remote sequentially (randomized order),
// stopping after the first successful response. If all fail, returns the last
// error.
82
func (px *standard) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error) {
83
	var err error
84 85
	for _, i := range rand.Perm(len(px.Remotes)) {
		remote := px.Remotes[i]
86 87 88 89 90 91 92 93 94 95
		var reply *dhtpb.Message
		reply, err = px.sendRequest(ctx, m, remote) // careful don't redeclare err!
		if err != nil {
			continue
		}
		return reply, nil // success
	}
	return nil, err // NB: returns the last error
}

96 97 98 99 100
func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote peer.ID) (*dhtpb.Message, error) {
	e := log.EventBegin(ctx, "sendRoutingRequest", px.Host.ID(), remote, eventlog.Pair("request", m))
	defer e.Done()
	if err := px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil {
		e.SetError(err)
101 102
		return nil, err
	}
103
	s, err := px.Host.NewStream(ProtocolSNR, remote)
104
	if err != nil {
105
		e.SetError(err)
106 107 108 109 110
		return nil, err
	}
	defer s.Close()
	r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
	w := ggio.NewDelimitedWriter(s)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
111
	if err = w.WriteMsg(m); err != nil {
112
		e.SetError(err)
113 114 115
		return nil, err
	}

116 117 118
	response := &dhtpb.Message{}
	if err = r.ReadMsg(response); err != nil {
		e.SetError(err)
119 120 121
		return nil, err
	}
	// need ctx expiration?
122 123 124 125
	if response == nil {
		err := errors.New("no response to request")
		e.SetError(err)
		return nil, err
126
	}
127 128 129
	e.Append(eventlog.Pair("response", response))
	e.Append(eventlog.Pair("uuid", eventlog.Uuid("foo")))
	return response, nil
130
}