dht_options.go 12.7 KB
Newer Older
1 2 3 4 5 6 7 8
package dht

import (
	"fmt"
	"time"

	ds "github.com/ipfs/go-datastore"
	dssync "github.com/ipfs/go-datastore/sync"
Alan Shaw's avatar
Alan Shaw committed
9
	"github.com/ipfs/go-ipns"
10
	"github.com/libp2p/go-libp2p-core/host"
11 12
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
13
	"github.com/libp2p/go-libp2p-core/protocol"
Alan Shaw's avatar
Alan Shaw committed
14
	"github.com/libp2p/go-libp2p-kad-dht/providers"
15
	record "github.com/libp2p/go-libp2p-record"
16 17 18 19 20 21 22 23 24 25 26 27 28
)

// ModeOpt describes what mode the dht should operate in
type ModeOpt int

const (
	// ModeAuto utilizes EvtLocalReachabilityChanged events sent over the event bus to dynamically switch the DHT
	// between Client and Server modes based on network conditions
	ModeAuto ModeOpt = iota
	// ModeClient operates the DHT as a client only, it cannot respond to incoming queries
	ModeClient
	// ModeServer operates the DHT as a server, it can both send and respond to queries
	ModeServer
29 30
	// ModeAutoServer operates in the same way as ModeAuto, but acts as a server when reachability is unknown
	ModeAutoServer
31 32
)

Alan Shaw's avatar
Alan Shaw committed
33
// DefaultPrefix is the application specific prefix attached to all DHT protocols by default.
Adin Schmahmann's avatar
Adin Schmahmann committed
34 35
const DefaultPrefix protocol.ID = "/ipfs"

36 37
// Options is a structure containing all the options that can be used when constructing a DHT.
type config struct {
38 39 40 41 42 43 44 45 46 47 48
	datastore        ds.Batching
	validator        record.Validator
	validatorChanged bool // if true implies that the validator has been changed and that defaults should not be used
	mode             ModeOpt
	protocolPrefix   protocol.ID
	bucketSize       int
	concurrency      int
	resiliency       int
	maxRecordAge     time.Duration
	enableProviders  bool
	enableValues     bool
Alan Shaw's avatar
Alan Shaw committed
49
	providersOptions []providers.Option
50
	queryPeerFilter  QueryFilterFunc
51 52 53

	routingTable struct {
		refreshQueryTimeout time.Duration
Aarsh Shah's avatar
Aarsh Shah committed
54
		refreshInterval     time.Duration
55 56
		autoRefresh         bool
		latencyTolerance    time.Duration
Aarsh Shah's avatar
Aarsh Shah committed
57
		checkInterval       time.Duration
58
		peerFilter          RouteTableFilterFunc
59
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
60

61 62
	// set to true if we're operating in v1 dht compatible mode
	v1CompatibleMode bool
63 64
}

65 66 67
func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool  { return true }
func emptyRTFilter(_ *IpfsDHT, conns []network.Conn) bool { return true }

Adin Schmahmann's avatar
Adin Schmahmann committed
68 69
// apply applies the given options to this Option
func (c *config) apply(opts ...Option) error {
70
	for i, opt := range opts {
Adin Schmahmann's avatar
Adin Schmahmann committed
71
		if err := opt(c); err != nil {
72 73 74 75 76 77
			return fmt.Errorf("dht option %d failed: %s", i, err)
		}
	}
	return nil
}

78 79
// applyFallbacks sets default values that could not be applied during config creation since they are dependent
// on other configuration parameters (e.g. optA is by default 2x optB) and/or on the Host
80
func (c *config) applyFallbacks(h host.Host) error {
81
	if !c.validatorChanged {
82 83
		nsval, ok := c.validator.(record.NamespacedValidator)
		if ok {
84 85 86 87 88 89
			if _, pkFound := nsval["pk"]; !pkFound {
				nsval["pk"] = record.PublicKeyValidator{}
			}
			if _, ipnsFound := nsval["ipns"]; !ipnsFound {
				nsval["ipns"] = ipns.Validator{KeyBook: h.Peerstore()}
			}
90 91
		} else {
			return fmt.Errorf("the default validator was changed without being marked as changed")
92 93
		}
	}
94
	return nil
95 96
}

97 98 99
// Option DHT option type.
type Option func(*config) error

Adin Schmahmann's avatar
Adin Schmahmann committed
100 101
const defaultBucketSize = 20

102 103 104
// defaults are the default DHT options. This option will be automatically
// prepended to any options you pass to the DHT constructor.
var defaults = func(o *config) error {
105
	o.validator = record.NamespacedValidator{}
106
	o.datastore = dssync.MutexWrap(ds.NewMapDatastore())
Adin Schmahmann's avatar
Adin Schmahmann committed
107
	o.protocolPrefix = DefaultPrefix
108 109
	o.enableProviders = true
	o.enableValues = true
110
	o.queryPeerFilter = emptyQueryFilter
111 112

	o.routingTable.latencyTolerance = time.Minute
Aarsh Shah's avatar
Aarsh Shah committed
113 114
	o.routingTable.refreshQueryTimeout = 1 * time.Minute
	o.routingTable.refreshInterval = 10 * time.Minute
115
	o.routingTable.autoRefresh = true
116
	o.routingTable.peerFilter = emptyRTFilter
117 118
	o.maxRecordAge = time.Hour * 36

Adin Schmahmann's avatar
Adin Schmahmann committed
119
	o.bucketSize = defaultBucketSize
120
	o.concurrency = 3
Adin Schmahmann's avatar
Adin Schmahmann committed
121
	o.resiliency = 3
122

123 124
	o.v1CompatibleMode = true

125 126 127
	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
128
func (c *config) validate() error {
129
	if c.protocolPrefix != DefaultPrefix {
Adin Schmahmann's avatar
Adin Schmahmann committed
130 131
		return nil
	}
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
	if c.bucketSize != defaultBucketSize {
		return fmt.Errorf("protocol prefix %s must use bucket size %d", DefaultPrefix, defaultBucketSize)
	}
	if !c.enableProviders {
		return fmt.Errorf("protocol prefix %s must have providers enabled", DefaultPrefix)
	}
	if !c.enableValues {
		return fmt.Errorf("protocol prefix %s must have values enabled", DefaultPrefix)
	}

	nsval, isNSVal := c.validator.(record.NamespacedValidator)
	if !isNSVal {
		return fmt.Errorf("protocol prefix %s must use a namespaced validator", DefaultPrefix)
	}

	if len(nsval) != 2 {
		return fmt.Errorf("protocol prefix %s must have exactly two namespaced validators - /pk and /ipns", DefaultPrefix)
	}

	if pkVal, pkValFound := nsval["pk"]; !pkValFound {
		return fmt.Errorf("protocol prefix %s must support the /pk namespaced validator", DefaultPrefix)
	} else if _, ok := pkVal.(record.PublicKeyValidator); !ok {
		return fmt.Errorf("protocol prefix %s must use the record.PublicKeyValidator for the /pk namespace", DefaultPrefix)
	}

	if ipnsVal, ipnsValFound := nsval["ipns"]; !ipnsValFound {
		return fmt.Errorf("protocol prefix %s must support the /ipns namespaced validator", DefaultPrefix)
	} else if _, ok := ipnsVal.(ipns.Validator); !ok {
		return fmt.Errorf("protocol prefix %s must use ipns.Validator for the /ipns namespace", DefaultPrefix)
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
162 163 164
	return nil
}

165 166 167
// RoutingTableLatencyTolerance sets the maximum acceptable latency for peers
// in the routing table's cluster.
func RoutingTableLatencyTolerance(latency time.Duration) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
168 169
	return func(c *config) error {
		c.routingTable.latencyTolerance = latency
170 171 172 173 174 175 176
		return nil
	}
}

// RoutingTableRefreshQueryTimeout sets the timeout for routing table refresh
// queries.
func RoutingTableRefreshQueryTimeout(timeout time.Duration) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
177 178
	return func(c *config) error {
		c.routingTable.refreshQueryTimeout = timeout
179 180 181 182 183 184 185 186 187 188 189
		return nil
	}
}

// RoutingTableRefreshPeriod sets the period for refreshing buckets in the
// routing table. The DHT will refresh buckets every period by:
//
// 1. First searching for nearby peers to figure out how many buckets we should try to fill.
// 1. Then searching for a random key in each bucket that hasn't been queried in
//    the last refresh period.
func RoutingTableRefreshPeriod(period time.Duration) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
190
	return func(c *config) error {
Aarsh Shah's avatar
Aarsh Shah committed
191
		c.routingTable.refreshInterval = period
192 193 194 195 196 197 198 199
		return nil
	}
}

// Datastore configures the DHT to use the specified datastore.
//
// Defaults to an in-memory (temporary) map.
func Datastore(ds ds.Batching) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
200 201
	return func(c *config) error {
		c.datastore = ds
202 203 204 205 206 207 208 209
		return nil
	}
}

// Mode configures which mode the DHT operates in (Client, Server, Auto).
//
// Defaults to ModeAuto.
func Mode(m ModeOpt) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
210 211
	return func(c *config) error {
		c.mode = m
212 213 214 215 216 217
		return nil
	}
}

// Validator configures the DHT to use the specified validator.
//
218
// Defaults to a namespaced validator that can validate both public key (under the "pk"
219
// namespace) and IPNS records (under the "ipns" namespace). Setting the validator
220 221
// implies that the user wants to control the validators and therefore the default
// public key and IPNS validators will not be added.
222
func Validator(v record.Validator) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
223 224
	return func(c *config) error {
		c.validator = v
225
		c.validatorChanged = true
226 227 228 229 230
		return nil
	}
}

// NamespacedValidator adds a validator namespaced under `ns`. This option fails
231 232 233 234 235 236
// if the DHT is not using a `record.NamespacedValidator` as its validator (it
// uses one by default but this can be overridden with the `Validator` option).
// Adding a namespaced validator without changing the `Validator` will result in
// adding a new validator in addition to the default public key and IPNS validators.
// The "pk" and "ipns" namespaces cannot be overridden here unless a new `Validator`
// has been set first.
237 238 239 240 241
//
// Example: Given a validator registered as `NamespacedValidator("ipns",
// myValidator)`, all records with keys starting with `/ipns/` will be validated
// with `myValidator`.
func NamespacedValidator(ns string, v record.Validator) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
242
	return func(c *config) error {
243 244 245 246 247
		nsval, ok := c.validator.(record.NamespacedValidator)
		if !ok {
			return fmt.Errorf("can only add namespaced validators to a NamespacedValidator")
		}
		nsval[ns] = v
248 249 250 251
		return nil
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
252 253
// ProtocolPrefix sets an application specific prefix to be attached to all DHT protocols. For example,
// /myapp/kad/1.0.0 instead of /ipfs/kad/1.0.0. Prefix should be of the form /myapp.
254
//
Adin Schmahmann's avatar
Adin Schmahmann committed
255 256 257 258
// Defaults to dht.DefaultPrefix
func ProtocolPrefix(prefix protocol.ID) Option {
	return func(c *config) error {
		c.protocolPrefix = prefix
259 260 261 262
		return nil
	}
}

263 264 265 266 267 268 269 270 271
// ProtocolExtension adds an application specific protocol to the DHT protocol. For example,
// /ipfs/lan/kad/1.0.0 instead of /ipfs/kad/1.0.0. extension should be of the form /lan.
func ProtocolExtension(ext protocol.ID) Option {
	return func(c *config) error {
		c.protocolPrefix += ext
		return nil
	}
}

272 273 274 275
// BucketSize configures the bucket size (k in the Kademlia paper) of the routing table.
//
// The default value is 20.
func BucketSize(bucketSize int) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
276 277
	return func(c *config) error {
		c.bucketSize = bucketSize
278 279 280 281 282 283 284 285
		return nil
	}
}

// Concurrency configures the number of concurrent requests (alpha in the Kademlia paper) for a given query path.
//
// The default value is 3.
func Concurrency(alpha int) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
286 287
	return func(c *config) error {
		c.concurrency = alpha
288 289 290 291
		return nil
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
292 293 294 295 296 297 298 299 300 301 302
// Resiliency configures the number of peers closest to a target that must have responded in order for a given query
// path to complete.
//
// The default value is 3.
func Resiliency(beta int) Option {
	return func(c *config) error {
		c.resiliency = beta
		return nil
	}
}

303 304 305 306 307 308 309
// MaxRecordAge specifies the maximum time that any node will hold onto a record ("PutValue record")
// from the time its received. This does not apply to any other forms of validity that
// the record may contain.
// For example, a record may contain an ipns entry with an EOL saying its valid
// until the year 2020 (a great time in the future). For that record to stick around
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
func MaxRecordAge(maxAge time.Duration) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
310 311
	return func(c *config) error {
		c.maxRecordAge = maxAge
312 313 314 315 316 317 318 319
		return nil
	}
}

// DisableAutoRefresh completely disables 'auto-refresh' on the DHT routing
// table. This means that we will neither refresh the routing table periodically
// nor when the routing table size goes below the minimum threshold.
func DisableAutoRefresh() Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
320 321
	return func(c *config) error {
		c.routingTable.autoRefresh = false
322 323 324 325 326 327 328 329 330 331 332
		return nil
	}
}

// DisableProviders disables storing and retrieving provider records.
//
// Defaults to enabled.
//
// WARNING: do not change this unless you're using a forked DHT (i.e., a private
// network and/or distinct DHT protocols with the `Protocols` option).
func DisableProviders() Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
333 334
	return func(c *config) error {
		c.enableProviders = false
335 336 337 338
		return nil
	}
}

Alan Shaw's avatar
Alan Shaw committed
339
// DisableValues disables storing and retrieving value records (including
340 341 342 343 344 345 346
// public keys).
//
// Defaults to enabled.
//
// WARNING: do not change this unless you're using a forked DHT (i.e., a private
// network and/or distinct DHT protocols with the `Protocols` option).
func DisableValues() Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
347 348 349 350 351 352
	return func(c *config) error {
		c.enableValues = false
		return nil
	}
}

Alan Shaw's avatar
Alan Shaw committed
353
// ProvidersOptions are options passed directly to the provider manager.
354 355 356 357
//
// The provider manager adds and gets provider records from the datastore, cahing
// them in between. These options are passed to the provider manager allowing
// customisation of things like the GC interval and cache implementation.
Alan Shaw's avatar
Alan Shaw committed
358 359 360 361 362 363 364
func ProvidersOptions(opts []providers.Option) Option {
	return func(c *config) error {
		c.providersOptions = opts
		return nil
	}
}

365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
// QueryFilter sets a function that approves which peers may be dialed in a query
func QueryFilter(filter QueryFilterFunc) Option {
	return func(c *config) error {
		c.queryPeerFilter = filter
		return nil
	}
}

// RoutingTableFilter sets a function that approves which peers may be added to the routing table. The host should
// already have at least one connection to the peer under consideration.
func RoutingTableFilter(filter RouteTableFilterFunc) Option {
	return func(c *config) error {
		c.routingTable.peerFilter = filter
		return nil
	}
}

382 383 384 385 386 387 388 389 390 391 392 393 394 395
// V1CompatibleMode sets the DHT to operate in V1 compatible mode. In this mode,
// the DHT node will act like a V1 DHT node (use the V1 protocol names) but will
// use the V2 query and routing table logic.
//
// For now, this option defaults to true for backwards compatibility. In the
// near future, it will switch to false.
//
// This option is perma-unstable and may be removed in the future.
func V1CompatibleMode(enable bool) Option {
	return func(c *config) error {
		c.v1CompatibleMode = enable
		return nil
	}
}