dht_options.go 9.78 KB
Newer Older
1 2 3 4 5 6 7 8
package dht

import (
	"fmt"
	"time"

	ds "github.com/ipfs/go-datastore"
	dssync "github.com/ipfs/go-datastore/sync"
9 10
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
11
	"github.com/libp2p/go-libp2p-core/protocol"
12
	record "github.com/libp2p/go-libp2p-record"
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
)

// ModeOpt describes what mode the dht should operate in
type ModeOpt int

const (
	// ModeAuto utilizes EvtLocalReachabilityChanged events sent over the event bus to dynamically switch the DHT
	// between Client and Server modes based on network conditions
	ModeAuto ModeOpt = iota
	// ModeClient operates the DHT as a client only, it cannot respond to incoming queries
	ModeClient
	// ModeServer operates the DHT as a server, it can both send and respond to queries
	ModeServer
)

Adin Schmahmann's avatar
Adin Schmahmann committed
28 29
const DefaultPrefix protocol.ID = "/ipfs"

30 31 32 33 34
// Options is a structure containing all the options that can be used when constructing a DHT.
type config struct {
	datastore       ds.Batching
	validator       record.Validator
	mode            ModeOpt
Adin Schmahmann's avatar
Adin Schmahmann committed
35
	protocolPrefix  protocol.ID
36 37
	bucketSize      int
	concurrency     int
Adin Schmahmann's avatar
Adin Schmahmann committed
38
	resiliency      int
39 40 41
	maxRecordAge    time.Duration
	enableProviders bool
	enableValues    bool
42
	queryPeerFilter QueryFilterFunc
43 44 45 46 47 48

	routingTable struct {
		refreshQueryTimeout time.Duration
		refreshPeriod       time.Duration
		autoRefresh         bool
		latencyTolerance    time.Duration
Aarsh Shah's avatar
Aarsh Shah committed
49
		checkInterval       time.Duration
50
		peerFilter          RouteTableFilterFunc
51
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
52 53 54 55 56 57

	// internal parameters, not publicly exposed
	protocols, serverProtocols []protocol.ID

	// test parameters
	testProtocols []protocol.ID
58 59
}

60 61 62
func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool  { return true }
func emptyRTFilter(_ *IpfsDHT, conns []network.Conn) bool { return true }

Adin Schmahmann's avatar
Adin Schmahmann committed
63 64
// apply applies the given options to this Option
func (c *config) apply(opts ...Option) error {
65
	for i, opt := range opts {
Adin Schmahmann's avatar
Adin Schmahmann committed
66
		if err := opt(c); err != nil {
67 68 69 70 71 72 73 74 75
			return fmt.Errorf("dht option %d failed: %s", i, err)
		}
	}
	return nil
}

// Option DHT option type.
type Option func(*config) error

Adin Schmahmann's avatar
Adin Schmahmann committed
76 77
const defaultBucketSize = 20

78 79 80 81 82 83 84
// defaults are the default DHT options. This option will be automatically
// prepended to any options you pass to the DHT constructor.
var defaults = func(o *config) error {
	o.validator = record.NamespacedValidator{
		"pk": record.PublicKeyValidator{},
	}
	o.datastore = dssync.MutexWrap(ds.NewMapDatastore())
Adin Schmahmann's avatar
Adin Schmahmann committed
85
	o.protocolPrefix = DefaultPrefix
86 87
	o.enableProviders = true
	o.enableValues = true
88
	o.queryPeerFilter = emptyQueryFilter
89 90

	o.routingTable.latencyTolerance = time.Minute
Aarsh Shah's avatar
Aarsh Shah committed
91
	o.routingTable.refreshQueryTimeout = 30 * time.Second
Aarsh Shah's avatar
Aarsh Shah committed
92
	o.routingTable.refreshPeriod = 10 * time.Minute
93
	o.routingTable.autoRefresh = true
94
	o.routingTable.peerFilter = emptyRTFilter
95 96
	o.maxRecordAge = time.Hour * 36

Adin Schmahmann's avatar
Adin Schmahmann committed
97
	o.bucketSize = defaultBucketSize
98
	o.concurrency = 3
Adin Schmahmann's avatar
Adin Schmahmann committed
99
	o.resiliency = 3
100 101 102 103

	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
func (c *config) validate() error {
	if c.protocolPrefix == DefaultPrefix {
		if c.bucketSize != defaultBucketSize {
			return fmt.Errorf("protocol prefix %s must use bucket size %d", DefaultPrefix, defaultBucketSize)
		}
		if !c.enableProviders {
			return fmt.Errorf("protocol prefix %s must have providers enabled", DefaultPrefix)
		}
		if !c.enableValues {
			return fmt.Errorf("protocol prefix %s must have values enabled", DefaultPrefix)
		}
		if nsval, ok := c.validator.(record.NamespacedValidator); !ok {
			return fmt.Errorf("protocol prefix %s must use a namespaced validator", DefaultPrefix)
		} else if len(nsval) > 2 || nsval["pk"] == nil || nsval["ipns"] == nil {
			return fmt.Errorf("protocol prefix %s must support only the /pk and /ipns namespaces", DefaultPrefix)
		}
		return nil
	}
	return nil
}

Aarsh Shah's avatar
Aarsh Shah committed
125 126 127 128 129 130 131 132
// RoutingTableCheckInterval is the interval between two runs of the RT cleanup routine.
func RoutingTableCheckInterval(i time.Duration) Option {
	return func(c *config) error {
		c.routingTable.checkInterval = i
		return nil
	}
}

133 134 135
// RoutingTableLatencyTolerance sets the maximum acceptable latency for peers
// in the routing table's cluster.
func RoutingTableLatencyTolerance(latency time.Duration) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
136 137
	return func(c *config) error {
		c.routingTable.latencyTolerance = latency
138 139 140 141 142 143 144
		return nil
	}
}

// RoutingTableRefreshQueryTimeout sets the timeout for routing table refresh
// queries.
func RoutingTableRefreshQueryTimeout(timeout time.Duration) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
145 146
	return func(c *config) error {
		c.routingTable.refreshQueryTimeout = timeout
147 148 149 150 151 152 153 154 155 156 157
		return nil
	}
}

// RoutingTableRefreshPeriod sets the period for refreshing buckets in the
// routing table. The DHT will refresh buckets every period by:
//
// 1. First searching for nearby peers to figure out how many buckets we should try to fill.
// 1. Then searching for a random key in each bucket that hasn't been queried in
//    the last refresh period.
func RoutingTableRefreshPeriod(period time.Duration) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
158 159
	return func(c *config) error {
		c.routingTable.refreshPeriod = period
160 161 162 163 164 165 166 167
		return nil
	}
}

// Datastore configures the DHT to use the specified datastore.
//
// Defaults to an in-memory (temporary) map.
func Datastore(ds ds.Batching) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
168 169
	return func(c *config) error {
		c.datastore = ds
170 171 172 173 174 175 176 177
		return nil
	}
}

// Mode configures which mode the DHT operates in (Client, Server, Auto).
//
// Defaults to ModeAuto.
func Mode(m ModeOpt) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
178 179
	return func(c *config) error {
		c.mode = m
180 181 182 183 184 185 186 187
		return nil
	}
}

// Validator configures the DHT to use the specified validator.
//
// Defaults to a namespaced validator that can only validate public keys.
func Validator(v record.Validator) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
188 189
	return func(c *config) error {
		c.validator = v
190 191 192 193 194 195 196 197 198 199 200 201
		return nil
	}
}

// NamespacedValidator adds a validator namespaced under `ns`. This option fails
// if the DHT is not using a `record.NamespacedValidator` as it's validator (it
// uses one by default but this can be overridden with the `Validator` option).
//
// Example: Given a validator registered as `NamespacedValidator("ipns",
// myValidator)`, all records with keys starting with `/ipns/` will be validated
// with `myValidator`.
func NamespacedValidator(ns string, v record.Validator) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
202 203
	return func(c *config) error {
		nsval, ok := c.validator.(record.NamespacedValidator)
204 205 206 207 208 209 210 211
		if !ok {
			return fmt.Errorf("can only add namespaced validators to a NamespacedValidator")
		}
		nsval[ns] = v
		return nil
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
212 213
// ProtocolPrefix sets an application specific prefix to be attached to all DHT protocols. For example,
// /myapp/kad/1.0.0 instead of /ipfs/kad/1.0.0. Prefix should be of the form /myapp.
214
//
Adin Schmahmann's avatar
Adin Schmahmann committed
215 216 217 218
// Defaults to dht.DefaultPrefix
func ProtocolPrefix(prefix protocol.ID) Option {
	return func(c *config) error {
		c.protocolPrefix = prefix
219 220 221 222 223 224 225 226
		return nil
	}
}

// BucketSize configures the bucket size (k in the Kademlia paper) of the routing table.
//
// The default value is 20.
func BucketSize(bucketSize int) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
227 228
	return func(c *config) error {
		c.bucketSize = bucketSize
229 230 231 232 233 234 235 236
		return nil
	}
}

// Concurrency configures the number of concurrent requests (alpha in the Kademlia paper) for a given query path.
//
// The default value is 3.
func Concurrency(alpha int) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
237 238
	return func(c *config) error {
		c.concurrency = alpha
239 240 241 242
		return nil
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
243 244 245 246 247 248 249 250 251 252 253
// Resiliency configures the number of peers closest to a target that must have responded in order for a given query
// path to complete.
//
// The default value is 3.
func Resiliency(beta int) Option {
	return func(c *config) error {
		c.resiliency = beta
		return nil
	}
}

254 255 256 257 258 259 260
// MaxRecordAge specifies the maximum time that any node will hold onto a record ("PutValue record")
// from the time its received. This does not apply to any other forms of validity that
// the record may contain.
// For example, a record may contain an ipns entry with an EOL saying its valid
// until the year 2020 (a great time in the future). For that record to stick around
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
func MaxRecordAge(maxAge time.Duration) Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
261 262
	return func(c *config) error {
		c.maxRecordAge = maxAge
263 264 265 266 267 268 269 270
		return nil
	}
}

// DisableAutoRefresh completely disables 'auto-refresh' on the DHT routing
// table. This means that we will neither refresh the routing table periodically
// nor when the routing table size goes below the minimum threshold.
func DisableAutoRefresh() Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
271 272
	return func(c *config) error {
		c.routingTable.autoRefresh = false
273 274 275 276 277 278 279 280 281 282 283
		return nil
	}
}

// DisableProviders disables storing and retrieving provider records.
//
// Defaults to enabled.
//
// WARNING: do not change this unless you're using a forked DHT (i.e., a private
// network and/or distinct DHT protocols with the `Protocols` option).
func DisableProviders() Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
284 285
	return func(c *config) error {
		c.enableProviders = false
286 287 288 289 290 291 292 293 294 295 296 297
		return nil
	}
}

// DisableProviders disables storing and retrieving value records (including
// public keys).
//
// Defaults to enabled.
//
// WARNING: do not change this unless you're using a forked DHT (i.e., a private
// network and/or distinct DHT protocols with the `Protocols` option).
func DisableValues() Option {
Adin Schmahmann's avatar
Adin Schmahmann committed
298 299 300 301 302 303
	return func(c *config) error {
		c.enableValues = false
		return nil
	}
}

304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
// QueryFilter sets a function that approves which peers may be dialed in a query
func QueryFilter(filter QueryFilterFunc) Option {
	return func(c *config) error {
		c.queryPeerFilter = filter
		return nil
	}
}

// RoutingTableFilter sets a function that approves which peers may be added to the routing table. The host should
// already have at least one connection to the peer under consideration.
func RoutingTableFilter(filter RouteTableFilterFunc) Option {
	return func(c *config) error {
		c.routingTable.peerFilter = filter
		return nil
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
321 322 323 324 325
// customProtocols is only to be used for testing. It sets the protocols that the DHT listens on and queries with to be
// the ones passed in. The custom protocols are still augmented by the Prefix.
func customProtocols(protos ...protocol.ID) Option {
	return func(c *config) error {
		c.testProtocols = protos
326 327 328
		return nil
	}
}