dual.go 7.24 KB
Newer Older
Will Scott's avatar
Will Scott committed
1 2 3 4 5 6 7 8 9 10 11 12 13
// Package dual provides an implementaiton of a split or "dual" dht, where two parallel instances
// are maintained for the global internet and the local LAN respectively.
package dual

import (
	"context"
	"fmt"
	"sync"

	"github.com/ipfs/go-cid"
	ci "github.com/libp2p/go-libp2p-core/crypto"
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/peer"
14
	"github.com/libp2p/go-libp2p-core/protocol"
Will Scott's avatar
Will Scott committed
15 16
	"github.com/libp2p/go-libp2p-core/routing"
	dht "github.com/libp2p/go-libp2p-kad-dht"
Will Scott's avatar
Will Scott committed
17
	helper "github.com/libp2p/go-libp2p-routing-helpers"
Will Scott's avatar
Will Scott committed
18 19 20 21 22 23 24 25 26
)

// DHT implements the routing interface to provide two concrete DHT implementationts for use
// in IPFS that are used to support both global network users and disjoint LAN usecases.
type DHT struct {
	WAN *dht.IpfsDHT
	LAN *dht.IpfsDHT
}

27 28 29
// DefaultLanExtension is used to differentiate local protocol requests from those on the WAN DHT.
const DefaultLanExtension protocol.ID = "/lan"

Will Scott's avatar
Will Scott committed
30 31 32 33 34 35 36 37 38 39
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
	_ routing.ContentRouting = (*DHT)(nil)
	_ routing.Routing        = (*DHT)(nil)
	_ routing.PeerRouting    = (*DHT)(nil)
	_ routing.PubKeyFetcher  = (*DHT)(nil)
	_ routing.ValueStore     = (*DHT)(nil)
)

40
// New creates a new DualDHT instance. Options provided are forwarded on to the two concrete
Will Scott's avatar
Will Scott committed
41 42
// IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce
// the LAN-vs-WAN distinction.
43 44 45
// Note: query or routing table functional options provided as arguments to this function
// will be overriden by this constructor.
func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) {
Will Scott's avatar
Will Scott committed
46 47 48 49 50 51 52 53 54
	wanOpts := append(options,
		dht.QueryFilter(dht.PublicQueryFilter),
		dht.RoutingTableFilter(dht.PublicRoutingTableFilter),
	)
	wan, err := dht.New(ctx, h, wanOpts...)
	if err != nil {
		return nil, err
	}

55 56
	// Unless overridden by user supplied options, the LAN DHT should default
	// to 'AutoServer' mode.
Will Scott's avatar
Will Scott committed
57
	lanOpts := append(options,
58
		dht.ProtocolExtension(DefaultLanExtension),
Will Scott's avatar
Will Scott committed
59 60 61
		dht.QueryFilter(dht.PrivateQueryFilter),
		dht.RoutingTableFilter(dht.PrivateRoutingTableFilter),
	)
Will Scott's avatar
Will Scott committed
62 63 64
	if wan.Mode() != dht.ModeClient {
		lanOpts = append(lanOpts, dht.Mode(dht.ModeServer))
	}
Will Scott's avatar
Will Scott committed
65 66 67 68 69 70 71 72 73
	lan, err := dht.New(ctx, h, lanOpts...)
	if err != nil {
		return nil, err
	}

	impl := DHT{wan, lan}
	return &impl, nil
}

Will Scott's avatar
Will Scott committed
74 75 76 77 78
// Close closes the DHT context.
func (dht *DHT) Close() error {
	return mergeErrors(dht.WAN.Close(), dht.LAN.Close())
}

Will Scott's avatar
Will Scott committed
79
func (dht *DHT) activeWAN() bool {
80
	return dht.WAN.RoutingTable().Size() > 0
Will Scott's avatar
Will Scott committed
81 82 83 84 85 86 87 88 89 90 91 92
}

// Provide adds the given cid to the content routing system.
func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) error {
	if dht.activeWAN() {
		return dht.WAN.Provide(ctx, key, announce)
	}
	return dht.LAN.Provide(ctx, key, announce)
}

// FindProvidersAsync searches for peers who are able to provide a given key
func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
	reqCtx, cancel := context.WithCancel(ctx)
	outCh := make(chan peer.AddrInfo)
	wanCh := dht.WAN.FindProvidersAsync(reqCtx, key, count)
	lanCh := dht.LAN.FindProvidersAsync(reqCtx, key, count)
	go func() {
		defer cancel()
		defer close(outCh)

		found := make(map[peer.ID]struct{}, count)
		nch := 2
		var pi peer.AddrInfo
		for nch > 0 && count > 0 {
			var ok bool
			select {
			case pi, ok = <-wanCh:
				if !ok {
					wanCh = nil
					nch--
					continue
				}
			case pi, ok = <-lanCh:
				if !ok {
					lanCh = nil
					nch--
					continue
				}
			}
			// already found
			if _, ok = found[pi.ID]; ok {
				continue
			}

			select {
			case outCh <- pi:
				found[pi.ID] = struct{}{}
				count--
			case <-ctx.Done():
				return
			}
		}
	}()
	return outCh
Will Scott's avatar
Will Scott committed
135 136 137
}

// FindPeer searches for a peer with given ID
138
// Note: with signed peer records, we can change this to short circuit once either DHT returns.
Will Scott's avatar
Will Scott committed
139
func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) {
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
	var wg sync.WaitGroup
	wg.Add(2)
	var wanInfo, lanInfo peer.AddrInfo
	var wanErr, lanErr error
	go func() {
		defer wg.Done()
		wanInfo, wanErr = dht.WAN.FindPeer(ctx, pid)
	}()
	go func() {
		defer wg.Done()
		lanInfo, lanErr = dht.LAN.FindPeer(ctx, pid)
	}()

	wg.Wait()

Will Scott's avatar
Will Scott committed
155 156
	return peer.AddrInfo{
		ID:    pid,
157 158
		Addrs: append(wanInfo.Addrs, lanInfo.Addrs...),
	}, mergeErrors(wanErr, lanErr)
Will Scott's avatar
Will Scott committed
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
}

func mergeErrors(a, b error) error {
	if a == nil && b == nil {
		return nil
	} else if a != nil && b != nil {
		return fmt.Errorf("%v, %v", a, b)
	} else if a != nil {
		return a
	}
	return b
}

// Bootstrap allows callers to hint to the routing system to get into a
// Boostrapped state and remain there.
func (dht *DHT) Bootstrap(ctx context.Context) error {
	erra := dht.WAN.Bootstrap(ctx)
	errb := dht.LAN.Bootstrap(ctx)
	return mergeErrors(erra, errb)
}

// PutValue adds value corresponding to given Key.
func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error {
	if dht.activeWAN() {
		return dht.WAN.PutValue(ctx, key, val, opts...)
	}
	return dht.LAN.PutValue(ctx, key, val, opts...)
}

// GetValue searches for the value corresponding to given Key.
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
	reqCtx, cncl := context.WithCancel(ctx)
	defer cncl()

	resChan := make(chan []byte)
	defer close(resChan)
	errChan := make(chan error)
	defer close(errChan)
	runner := func(impl *dht.IpfsDHT, valCh chan []byte, errCh chan error) {
		val, err := impl.GetValue(reqCtx, key, opts...)
		if err != nil {
			errCh <- err
			return
		}
		valCh <- val
	}
	go runner(d.WAN, resChan, errChan)
	go runner(d.LAN, resChan, errChan)

	var err error
	var val []byte
	select {
	case val = <-resChan:
		cncl()
	case err = <-errChan:
	}

	// Drain or wait for the slower runner
	select {
	case secondVal := <-resChan:
		if val == nil {
			val = secondVal
		}
	case secondErr := <-errChan:
		if err != nil {
			err = mergeErrors(err, secondErr)
		} else if val == nil {
			err = secondErr
		}
Will Scott's avatar
Will Scott committed
228
	}
229
	return val, err
Will Scott's avatar
Will Scott committed
230 231 232 233
}

// SearchValue searches for better values from this value
func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
Will Scott's avatar
Will Scott committed
234 235
	p := helper.Parallel{Routers: []routing.Routing{dht.WAN, dht.LAN}, Validator: dht.WAN.Validator}
	return p.SearchValue(ctx, key, opts...)
Will Scott's avatar
Will Scott committed
236 237 238
}

// GetPublicKey returns the public key for the given peer.
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
func (d *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) {
	reqCtx, cncl := context.WithCancel(ctx)
	defer cncl()

	resChan := make(chan ci.PubKey)
	defer close(resChan)
	errChan := make(chan error)
	defer close(errChan)
	runner := func(impl *dht.IpfsDHT, valCh chan ci.PubKey, errCh chan error) {
		val, err := impl.GetPublicKey(reqCtx, pid)
		if err != nil {
			errCh <- err
			return
		}
		valCh <- val
Will Scott's avatar
Will Scott committed
254
	}
255 256 257 258 259 260 261 262 263
	go runner(d.WAN, resChan, errChan)
	go runner(d.LAN, resChan, errChan)

	var err error
	var val ci.PubKey
	select {
	case val = <-resChan:
		cncl()
	case err = <-errChan:
Will Scott's avatar
Will Scott committed
264
	}
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279

	// Drain or wait for the slower runner
	select {
	case secondVal := <-resChan:
		if val == nil {
			val = secondVal
		}
	case secondErr := <-errChan:
		if err != nil {
			err = mergeErrors(err, secondErr)
		} else if val == nil {
			err = secondErr
		}
	}
	return val, err
Will Scott's avatar
Will Scott committed
280
}