standard.go 4.76 KB
Newer Older
1 2 3
package proxy

import (
4 5 6 7 8 9 10 11 12 13
	ggio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
	host "github.com/ipfs/go-ipfs/p2p/host"
	inet "github.com/ipfs/go-ipfs/p2p/net"
	peer "github.com/ipfs/go-ipfs/p2p/peer"
	dhtpb "github.com/ipfs/go-ipfs/routing/dht/pb"
	kbucket "github.com/ipfs/go-ipfs/routing/kbucket"
	eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
	"github.com/ipfs/go-ipfs/util"
	errors "github.com/ipfs/go-ipfs/util/debugerror"
14 15
)

16
const ProtocolSNR = "/ipfs/supernoderouting"
17

18
var log = eventlog.Logger("supernode/proxy")
Brian Tiger Chow's avatar
Brian Tiger Chow committed
19

20
type Proxy interface {
21
	Bootstrap(context.Context) error
22
	HandleStream(inet.Stream)
23 24 25 26 27
	SendMessage(ctx context.Context, m *dhtpb.Message) error
	SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error)
}

type standard struct {
28 29 30 31
	Host host.Host

	remoteInfos []peer.PeerInfo // addr required for bootstrapping
	remoteIDs   []peer.ID       // []ID is required for each req. here, cached for performance.
32 33
}

34
func Standard(h host.Host, remotes []peer.PeerInfo) Proxy {
35 36 37 38 39
	var ids []peer.ID
	for _, remote := range remotes {
		ids = append(ids, remote.ID)
	}
	return &standard{h, remotes, ids}
40 41
}

42
func (px *standard) Bootstrap(ctx context.Context) error {
43
	var cxns []peer.PeerInfo
44
	for _, info := range px.remoteInfos {
45
		if err := px.Host.Connect(ctx, info); err != nil {
46
			continue
47
		}
48 49 50 51 52
		cxns = append(cxns, info)
	}
	if len(cxns) == 0 {
		log.Critical("unable to bootstrap to any supernode routers")
	} else {
Brian Tiger Chow's avatar
fix log  
Brian Tiger Chow committed
53
		log.Infof("bootstrapped to %d supernode routers: %s", len(cxns), cxns)
54
	}
55 56 57
	return nil
}

58
func (p *standard) HandleStream(s inet.Stream) {
59
	// TODO(brian): Should clients be able to satisfy requests?
60
	log.Error("supernode client received (dropped) a routing message from", s.Conn().RemotePeer())
61
	s.Close()
62
}
63

64 65
const replicationFactor = 2

Brian Tiger Chow's avatar
Brian Tiger Chow committed
66 67 68
// SendMessage sends message to each remote sequentially (randomized order),
// stopping after the first successful response. If all fail, returns the last
// error.
69
func (px *standard) SendMessage(ctx context.Context, m *dhtpb.Message) error {
70
	var err error
71
	var numSuccesses int
72
	for _, remote := range sortedByKey(px.remoteIDs, m.GetKey()) {
73 74 75
		if err = px.sendMessage(ctx, m, remote); err != nil { // careful don't re-declare err!
			continue
		}
76 77 78 79 80 81 82
		numSuccesses++
		switch m.GetType() {
		case dhtpb.Message_ADD_PROVIDER, dhtpb.Message_PUT_VALUE:
			if numSuccesses < replicationFactor {
				continue
			}
		}
83 84 85 86 87
		return nil // success
	}
	return err // NB: returns the last error
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
88 89 90 91 92 93 94 95 96
func (px *standard) sendMessage(ctx context.Context, m *dhtpb.Message, remote peer.ID) (err error) {
	e := log.EventBegin(ctx, "sendRoutingMessage", px.Host.ID(), remote, m)
	defer func() {
		if err != nil {
			e.SetError(err)
		}
		e.Done()
	}()
	if err = px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil {
97 98
		return err
	}
99
	s, err := px.Host.NewStream(ProtocolSNR, remote)
100 101 102 103 104 105 106 107 108 109 110
	if err != nil {
		return err
	}
	defer s.Close()
	pbw := ggio.NewDelimitedWriter(s)
	if err := pbw.WriteMsg(m); err != nil {
		return errors.Wrap(err)
	}
	return nil
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
111 112 113
// SendRequest sends the request to each remote sequentially (randomized order),
// stopping after the first successful response. If all fail, returns the last
// error.
114
func (px *standard) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error) {
115
	var err error
116
	for _, remote := range sortedByKey(px.remoteIDs, m.GetKey()) {
117 118 119 120 121 122 123 124 125 126
		var reply *dhtpb.Message
		reply, err = px.sendRequest(ctx, m, remote) // careful don't redeclare err!
		if err != nil {
			continue
		}
		return reply, nil // success
	}
	return nil, err // NB: returns the last error
}

127 128 129 130 131
func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote peer.ID) (*dhtpb.Message, error) {
	e := log.EventBegin(ctx, "sendRoutingRequest", px.Host.ID(), remote, eventlog.Pair("request", m))
	defer e.Done()
	if err := px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil {
		e.SetError(err)
132 133
		return nil, err
	}
134
	s, err := px.Host.NewStream(ProtocolSNR, remote)
135
	if err != nil {
136
		e.SetError(err)
137 138 139 140 141
		return nil, err
	}
	defer s.Close()
	r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
	w := ggio.NewDelimitedWriter(s)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
142
	if err = w.WriteMsg(m); err != nil {
143
		e.SetError(err)
144 145 146
		return nil, err
	}

147 148 149
	response := &dhtpb.Message{}
	if err = r.ReadMsg(response); err != nil {
		e.SetError(err)
150 151 152
		return nil, err
	}
	// need ctx expiration?
153 154 155 156
	if response == nil {
		err := errors.New("no response to request")
		e.SetError(err)
		return nil, err
157
	}
158 159 160
	e.Append(eventlog.Pair("response", response))
	e.Append(eventlog.Pair("uuid", eventlog.Uuid("foo")))
	return response, nil
161
}
162 163 164 165 166

func sortedByKey(peers []peer.ID, key string) []peer.ID {
	target := kbucket.ConvertKey(util.Key(key))
	return kbucket.SortClosestPeers(peers, target)
}