server.go 6.5 KB
Newer Older
1
package supernode
2 3

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5
	"errors"
6 7
	"fmt"

8
	proxy "github.com/ipfs/go-ipfs/routing/supernode/proxy"
Jeromy's avatar
Jeromy committed
9
	dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"
Jeromy's avatar
Jeromy committed
10

11
	pstore "gx/ipfs/QmQMQ2RUjnaEEX8ybmrhuFFGhAwPjyL1Eo6ZoJGD7aAccM/go-libp2p-peerstore"
Jeromy's avatar
Jeromy committed
12
	datastore "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
13
	dhtpb "gx/ipfs/QmUpZqxzrUoyDsgWXDri9yYgi5r5EK7J5Tan1MbgnawYLx/go-libp2p-kad-dht/pb"
Jeromy's avatar
Jeromy committed
14
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
15 16 17
	peer "gx/ipfs/QmZcUPvPhD1Xvk6mwijYF8AfR3mG31S1YsEfHG4khrFPRr/go-libp2p-peer"
	record "gx/ipfs/QmZp9q8DbrGLztoxpkTC62mnRayRwHcAzGJJ8AvYRwjanR/go-libp2p-record"
	pb "gx/ipfs/QmZp9q8DbrGLztoxpkTC62mnRayRwHcAzGJJ8AvYRwjanR/go-libp2p-record/pb"
18 19 20 21 22
)

// Server handles routing queries using a database backend
type Server struct {
	local           peer.ID
Jeromy's avatar
Jeromy committed
23
	routingBackend  datastore.Datastore
Jeromy's avatar
Jeromy committed
24
	peerstore       pstore.Peerstore
25 26 27
	*proxy.Loopback // so server can be injected into client
}

28
// NewServer creates a new Supernode routing Server
Jeromy's avatar
Jeromy committed
29
func NewServer(ds datastore.Datastore, ps pstore.Peerstore, local peer.ID) (*Server, error) {
30
	s := &Server{local, ds, ps, nil}
31 32 33 34 35 36 37
	s.Loopback = &proxy.Loopback{
		Handler: s,
		Local:   local,
	}
	return s, nil
}

38 39 40 41
func (_ *Server) Bootstrap(ctx context.Context) error {
	return nil
}

42 43 44 45 46 47 48 49 50 51
// HandleLocalRequest implements the proxy.RequestHandler interface. This is
// where requests are received from the outside world.
func (s *Server) HandleRequest(ctx context.Context, p peer.ID, req *dhtpb.Message) *dhtpb.Message {
	_, response := s.handleMessage(ctx, p, req) // ignore response peer. it's local.
	return response
}

func (s *Server) handleMessage(
	ctx context.Context, p peer.ID, req *dhtpb.Message) (peer.ID, *dhtpb.Message) {

52
	defer log.EventBegin(ctx, "routingMessageReceived", req, p).Done()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
53

54 55 56 57
	var response = dhtpb.NewMessage(req.GetType(), req.GetKey(), req.GetClusterLevel())
	switch req.GetType() {

	case dhtpb.Message_GET_VALUE:
Jeromy's avatar
Jeromy committed
58
		rawRecord, err := getRoutingRecord(s.routingBackend, req.GetKey())
59 60 61
		if err != nil {
			return "", nil
		}
62
		response.Record = rawRecord
63 64 65
		return p, response

	case dhtpb.Message_PUT_VALUE:
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66 67 68 69 70 71
		// FIXME: verify complains that the peer's ID is not present in the
		// peerstore. Mocknet problem?
		// if err := verify(s.peerstore, req.GetRecord()); err != nil {
		// 	log.Event(ctx, "validationFailed", req, p)
		// 	return "", nil
		// }
Jeromy's avatar
Jeromy committed
72
		putRoutingRecord(s.routingBackend, req.GetKey(), req.GetRecord())
73
		return p, req
74 75 76

	case dhtpb.Message_FIND_NODE:
		p := s.peerstore.PeerInfo(peer.ID(req.GetKey()))
77
		pri := []dhtpb.PeerRoutingInfo{
rht's avatar
rht committed
78
			{
79 80 81 82 83
				PeerInfo: p,
				// Connectedness: TODO
			},
		}
		response.CloserPeers = dhtpb.PeerRoutingInfosToPBPeers(pri)
84 85 86
		return p.ID, response

	case dhtpb.Message_ADD_PROVIDER:
87 88 89 90 91
		for _, provider := range req.GetProviderPeers() {
			providerID := peer.ID(provider.GetId())
			if providerID == p {
				store := []*dhtpb.Message_Peer{provider}
				storeProvidersToPeerstore(s.peerstore, p, store)
Jeromy's avatar
Jeromy committed
92
				if err := putRoutingProviders(s.routingBackend, req.GetKey(), store); err != nil {
93 94 95 96 97
					return "", nil
				}
			} else {
				log.Event(ctx, "addProviderBadRequest", p, req)
			}
98 99 100 101
		}
		return "", nil

	case dhtpb.Message_GET_PROVIDERS:
Jeromy's avatar
Jeromy committed
102
		providers, err := getRoutingProviders(s.routingBackend, req.GetKey())
103 104
		if err != nil {
			return "", nil
105
		}
106
		response.ProviderPeers = providers
107 108 109 110 111 112 113 114 115 116 117
		return p, response

	case dhtpb.Message_PING:
		return p, req
	default:
	}
	return "", nil
}

var _ proxy.RequestHandler = &Server{}
var _ proxy.Proxy = &Server{}
118

Jeromy's avatar
Jeromy committed
119
func getRoutingRecord(ds datastore.Datastore, k string) (*pb.Record, error) {
120
	dskey := dshelp.NewKeyFromBinary([]byte(k))
121 122
	val, err := ds.Get(dskey)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
123
		return nil, err
124 125 126 127 128
	}
	recordBytes, ok := val.([]byte)
	if !ok {
		return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
	}
129
	var record pb.Record
130 131 132 133 134 135
	if err := proto.Unmarshal(recordBytes, &record); err != nil {
		return nil, errors.New("failed to unmarshal dht record from datastore")
	}
	return &record, nil
}

Jeromy's avatar
Jeromy committed
136
func putRoutingRecord(ds datastore.Datastore, k string, value *pb.Record) error {
137 138 139 140
	data, err := proto.Marshal(value)
	if err != nil {
		return err
	}
141
	dskey := dshelp.NewKeyFromBinary([]byte(k))
142 143 144 145 146 147 148
	// TODO namespace
	if err := ds.Put(dskey, data); err != nil {
		return err
	}
	return nil
}

Jeromy's avatar
Jeromy committed
149 150
func putRoutingProviders(ds datastore.Datastore, k string, newRecords []*dhtpb.Message_Peer) error {
	log.Event(context.Background(), "putRoutingProviders")
151
	oldRecords, err := getRoutingProviders(ds, k)
152 153
	if err != nil {
		return err
154
	}
155 156 157 158 159 160 161
	mergedRecords := make(map[string]*dhtpb.Message_Peer)
	for _, provider := range oldRecords {
		mergedRecords[provider.GetId()] = provider // add original records
	}
	for _, provider := range newRecords {
		mergedRecords[provider.GetId()] = provider // overwrite old record if new exists
	}
162
	var protomsg dhtpb.Message
163 164 165 166
	protomsg.ProviderPeers = make([]*dhtpb.Message_Peer, 0, len(mergedRecords))
	for _, provider := range mergedRecords {
		protomsg.ProviderPeers = append(protomsg.ProviderPeers, provider)
	}
167 168 169 170
	data, err := proto.Marshal(&protomsg)
	if err != nil {
		return err
	}
171
	return ds.Put(providerKey(k), data)
172 173
}

Jeromy's avatar
Jeromy committed
174
func storeProvidersToPeerstore(ps pstore.Peerstore, p peer.ID, providers []*dhtpb.Message_Peer) {
175 176 177 178 179 180 181 182
	for _, provider := range providers {
		providerID := peer.ID(provider.GetId())
		if providerID != p {
			log.Errorf("provider message came from third-party %s", p)
			continue
		}
		for _, maddr := range provider.Addresses() {
			// as a router, we want to store addresses for peers who have provided
Jeromy's avatar
Jeromy committed
183
			ps.AddAddr(p, maddr, pstore.AddressTTL)
184 185 186 187
		}
	}
}

Jeromy's avatar
Jeromy committed
188 189
func getRoutingProviders(ds datastore.Datastore, k string) ([]*dhtpb.Message_Peer, error) {
	e := log.EventBegin(context.Background(), "getProviders")
190
	defer e.Done()
191
	var providers []*dhtpb.Message_Peer
192
	if v, err := ds.Get(providerKey(k)); err == nil {
193 194 195 196 197 198 199 200 201 202
		if data, ok := v.([]byte); ok {
			var msg dhtpb.Message
			if err := proto.Unmarshal(data, &msg); err != nil {
				return nil, err
			}
			providers = append(providers, msg.GetProviderPeers()...)
		}
	}
	return providers, nil
}
203

Jeromy's avatar
Jeromy committed
204 205
func providerKey(k string) datastore.Key {
	return datastore.KeyWithNamespaces([]string{"routing", "providers", k})
206
}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
207

208
func verify(ps pstore.Peerstore, r *pb.Record) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
209
	v := make(record.Validator)
210
	v["pk"] = record.PublicKeyValidator
Brian Tiger Chow's avatar
Brian Tiger Chow committed
211 212 213 214 215
	p := peer.ID(r.GetAuthor())
	pk := ps.PubKey(p)
	if pk == nil {
		return fmt.Errorf("do not have public key for %s", p)
	}
Jeromy's avatar
Jeromy committed
216 217 218 219
	if err := record.CheckRecordSig(r, pk); err != nil {
		return err
	}
	if err := v.VerifyRecord(r); err != nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
220 221 222 223
		return err
	}
	return nil
}