testinstance.go 4.97 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
package testinstance

import (
	"context"
	"time"

	"github.com/ipfs/go-datastore"
	ds "github.com/ipfs/go-datastore"
	"github.com/ipfs/go-datastore/delayed"
	ds_sync "github.com/ipfs/go-datastore/sync"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
	delay "github.com/ipfs/go-ipfs-delay"
	"github.com/ipld/go-ipld-prime"
	peer "github.com/libp2p/go-libp2p-core/peer"
	p2ptestutil "github.com/libp2p/go-libp2p-netutil"
	tnet "github.com/libp2p/go-libp2p-testing/net"
17 18 19 20 21 22

	graphsync "github.com/ipfs/go-graphsync"
	tn "github.com/ipfs/go-graphsync/benchmarks/testnet"
	gsimpl "github.com/ipfs/go-graphsync/impl"
	gsnet "github.com/ipfs/go-graphsync/network"
	"github.com/ipfs/go-graphsync/storeutil"
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
)

// TempDirGenerator is any interface that can generate temporary directories
type TempDirGenerator interface {
	TempDir() string
}

// NewTestInstanceGenerator generates a new InstanceGenerator for the given
// testnet
func NewTestInstanceGenerator(ctx context.Context, net tn.Network, gsOptions []gsimpl.Option, tempDirGenerator TempDirGenerator) InstanceGenerator {
	ctx, cancel := context.WithCancel(ctx)
	return InstanceGenerator{
		net:              net,
		seq:              0,
		ctx:              ctx, // TODO take ctx as param to Next, Instances
		cancel:           cancel,
		gsOptions:        gsOptions,
		tempDirGenerator: tempDirGenerator,
	}
}

// InstanceGenerator generates new test instances of bitswap+dependencies
type InstanceGenerator struct {
	seq              int
	net              tn.Network
	ctx              context.Context
	cancel           context.CancelFunc
	gsOptions        []gsimpl.Option
	tempDirGenerator TempDirGenerator
}

// Close closes the clobal context, shutting down all test instances
func (g *InstanceGenerator) Close() error {
	g.cancel()
	return nil // for Closer interface
}

// Next generates a new instance of graphsync + dependencies
func (g *InstanceGenerator) Next() (Instance, error) {
	g.seq++
	p, err := p2ptestutil.RandTestBogusIdentity()
	if err != nil {
		return Instance{}, err
	}
	return NewInstance(g.ctx, g.net, p, g.gsOptions, g.tempDirGenerator.TempDir())
}

// Instances creates N test instances of bitswap + dependencies and connects
// them to each other
func (g *InstanceGenerator) Instances(n int) ([]Instance, error) {
	var instances []Instance
	for j := 0; j < n; j++ {
		inst, err := g.Next()
		if err != nil {
			return nil, err
		}
		instances = append(instances, inst)
	}
	ConnectInstances(instances)
	return instances, nil
}

// ConnectInstances connects the given instances to each other
func ConnectInstances(instances []Instance) {
	for i, inst := range instances {
		for j := i + 1; j < len(instances); j++ {
			oinst := instances[j]
			err := inst.Adapter.ConnectTo(context.Background(), oinst.Peer)
			if err != nil {
				panic(err.Error())
			}
		}
	}
}

// Close closes multiple instances at once
func Close(instances []Instance) error {
	for _, i := range instances {
		if err := i.Close(); err != nil {
			return err
		}
	}
	return nil
}

// Instance is a test instance of bitswap + dependencies for integration testing
type Instance struct {
	Peer            peer.ID
	Loader          ipld.Loader
	Storer          ipld.Storer
	Exchange        graphsync.GraphExchange
	BlockStore      blockstore.Blockstore
	Adapter         gsnet.GraphSyncNetwork
	blockstoreDelay delay.D
	ds              datastore.Batching
}

// Close closes the associated datastore
func (i *Instance) Close() error {
	return i.ds.Close()
}

// Blockstore returns the block store for this test instance
func (i *Instance) Blockstore() blockstore.Blockstore {
	return i.BlockStore
}

// SetBlockstoreLatency customizes the artificial delay on receiving blocks
// from a blockstore test instance.
func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
	return i.blockstoreDelay.Set(t)
}

// NewInstance creates a test bitswap instance.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// instances. To safeguard, use the InstanceGenerator to generate instances. It's
// just a much better idea.
func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, gsOptions []gsimpl.Option, tempDir string) (Instance, error) {
	bsdelay := delay.Fixed(0)

	adapter := net.Adapter(p)
	dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
	bstore, err := blockstore.CachedBlockstore(ctx,
		blockstore.NewBlockstore(dstore),
		blockstore.DefaultCacheOpts())
	if err != nil {
		return Instance{}, err
	}

	loader := storeutil.LoaderForBlockstore(bstore)
	storer := storeutil.StorerForBlockstore(bstore)
	gs := gsimpl.New(ctx, adapter, loader, storer, gsOptions...)
	gs.RegisterIncomingRequestHook(func(p peer.ID, request graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
		hookActions.ValidateRequest()
	})

	return Instance{
		Adapter:         adapter,
		Peer:            p.ID(),
		Exchange:        gs,
		Loader:          loader,
		Storer:          storer,
		BlockStore:      bstore,
		blockstoreDelay: bsdelay,
		ds:              dstore,
	}, nil
}