Commit f44ef3fe authored by Brian Tiger Chow's avatar Brian Tiger Chow

Merge pull request #538 from jbenet/refactor/core-construction

refactor(core): NewIPFSNode constructor
parents 836e5cab 63c0d416
......@@ -121,7 +121,7 @@ func doInit(configRoot string, dspathOverride string, force bool, nBitsForKeypai
func addTheWelcomeFile(conf *config.Config) error {
// TODO extract this file creation operation into a function
ctx, cancel := context.WithCancel(context.Background())
nd, err := core.NewIpfsNode(ctx, conf, false)
nd, err := core.NewIPFSNode(ctx, core.Offline(conf))
if err != nil {
return err
}
......
......@@ -188,7 +188,7 @@ func (i *cmdInvocation) constructNodeFunc(ctx context.Context) func() (*core.Ipf
// ok everything is good. set it on the invocation (for ownership)
// and return it.
i.node, err = core.NewIpfsNode(ctx, cfg, cmdctx.Online)
i.node, err = core.NewIPFSNode(ctx, core.Standard(cfg, cmdctx.Online))
return i.node, err
}
}
......
......@@ -29,7 +29,7 @@ func superviseConnections(parent context.Context,
h host.Host,
route *dht.IpfsDHT, // TODO depend on abstract interface for testing purposes
store peer.Peerstore,
peers []config.BootstrapPeer) error {
peers []peer.PeerInfo) error {
for {
ctx, _ := context.WithTimeout(parent, connectiontimeout)
......@@ -51,7 +51,7 @@ func bootstrap(ctx context.Context,
h host.Host,
r *dht.IpfsDHT,
ps peer.Peerstore,
boots []config.BootstrapPeer) error {
bootstrapPeers []peer.PeerInfo) error {
connectedPeers := h.Network().Peers()
if len(connectedPeers) >= recoveryThreshold {
......@@ -66,17 +66,6 @@ func bootstrap(ctx context.Context,
log.Event(ctx, "bootstrapStart", h.ID())
log.Debugf("%s bootstrapping to %d more nodes", h.ID(), numCxnsToCreate)
var bootstrapPeers []peer.PeerInfo
for _, bootstrap := range boots {
p, err := toPeer(bootstrap)
if err != nil {
log.Event(ctx, "bootstrapError", h.ID(), lgbl.Error(err))
log.Errorf("%s bootstrap error: %s", h.ID(), err)
return err
}
bootstrapPeers = append(bootstrapPeers, p)
}
var notConnected []peer.PeerInfo
for _, p := range bootstrapPeers {
if h.Network().Connectedness(p.ID) != inet.Connected {
......
......@@ -6,6 +6,7 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
......@@ -15,7 +16,7 @@ import (
exchange "github.com/jbenet/go-ipfs/exchange"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
"github.com/jbenet/go-ipfs/exchange/offline"
offline "github.com/jbenet/go-ipfs/exchange/offline"
mount "github.com/jbenet/go-ipfs/fuse/mount"
merkledag "github.com/jbenet/go-ipfs/merkledag"
namesys "github.com/jbenet/go-ipfs/namesys"
......@@ -28,9 +29,11 @@ import (
pin "github.com/jbenet/go-ipfs/pin"
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
util "github.com/jbenet/go-ipfs/util"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
)
const IpnsValidatorTag = "ipns"
......@@ -38,33 +41,51 @@ const kSizeBlockstoreWriteCache = 100
var log = eventlog.Logger("core")
type mode int
const (
// zero value is not a valid mode, must be explicitly set
invalidMode mode = iota
offlineMode
onlineMode
)
// IpfsNode is IPFS Core module. It represents an IPFS instance.
type IpfsNode struct {
// Self
Config *config.Config // the node's configuration
Identity peer.ID // the local node's identity
PrivateKey ic.PrivKey // the local node's private Key
onlineMode bool // alternatively, offline
// Local node
// TODO abstract as repo.Repo
Config *config.Config // the node's configuration
Datastore ds2.ThreadSafeDatastoreCloser // the local datastore
// Local node
Pinning pin.Pinner // the pinning manager
Mounts Mounts // current mount state, if any.
// Services
Peerstore peer.Peerstore // storage for other Peer instances
PeerHost p2phost.Host // the network host (server+client)
Routing routing.IpfsRouting // the routing system. recommend ipfs-dht
Exchange exchange.Interface // the block exchange + strategy (bitswap)
Blockstore bstore.Blockstore // the block store (lower level)
Blocks *bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
// Online
PrivateKey ic.PrivKey // the local node's private Key
PeerHost p2phost.Host // the network host (server+client)
Routing routing.IpfsRouting // the routing system. recommend ipfs-dht
Exchange exchange.Interface // the block exchange + strategy (bitswap)
Namesys namesys.NameSystem // the name system, resolves paths to hashes
Diagnostics *diag.Diagnostics // the diagnostics service
ctxgroup.ContextGroup
// dht allows node to Bootstrap when dht is present
// TODO privatize before merging. This is here temporarily during the
// migration of the TestNet constructor
DHT *dht.IpfsDHT
mode mode
}
// Mounts defines what the node's mount state is. This should
......@@ -75,8 +96,46 @@ type Mounts struct {
Ipns mount.Mount
}
// NewIpfsNode constructs a new IpfsNode based on the given config.
func NewIpfsNode(ctx context.Context, cfg *config.Config, online bool) (n *IpfsNode, err error) {
type ConfigOption func(ctx context.Context) (*IpfsNode, error)
func NewIPFSNode(ctx context.Context, option ConfigOption) (*IpfsNode, error) {
node, err := option(ctx)
if err != nil {
return nil, err
}
// Need to make sure it's perfectly clear 1) which variables are expected
// to be initialized at this point, and 2) which variables will be
// initialized after this point.
node.Blocks, err = bserv.New(node.Blockstore, node.Exchange)
if err != nil {
return nil, debugerror.Wrap(err)
}
if node.Peerstore == nil {
node.Peerstore = peer.NewPeerstore()
}
node.DAG = merkledag.NewDAGService(node.Blocks)
node.Pinning, err = pin.LoadPinner(node.Datastore, node.DAG)
if err != nil {
node.Pinning = pin.NewPinner(node.Datastore, node.DAG)
}
node.Resolver = &path.Resolver{DAG: node.DAG}
return node, nil
}
func Offline(cfg *config.Config) ConfigOption {
return Standard(cfg, false)
}
func Online(cfg *config.Config) ConfigOption {
return Standard(cfg, true)
}
// DEPRECATED: use Online, Offline functions
func Standard(cfg *config.Config, online bool) ConfigOption {
return func(ctx context.Context) (n *IpfsNode, err error) {
success := false // flip to true after all sub-system inits succeed
defer func() {
if !success && n != nil {
......@@ -87,11 +146,16 @@ func NewIpfsNode(ctx context.Context, cfg *config.Config, online bool) (n *IpfsN
if cfg == nil {
return nil, debugerror.Errorf("configuration required")
}
n = &IpfsNode{
onlineMode: online,
mode: func() mode {
if online {
return onlineMode
}
return offlineMode
}(),
Config: cfg,
}
n.ContextGroup = ctxgroup.WithContextAndTeardown(ctx, n.teardown)
ctx = n.ContextGroup.Context()
......@@ -122,20 +186,9 @@ func NewIpfsNode(ctx context.Context, cfg *config.Config, online bool) (n *IpfsN
n.Exchange = offline.Exchange(n.Blockstore)
}
n.Blocks, err = bserv.New(n.Blockstore, n.Exchange)
if err != nil {
return nil, debugerror.Wrap(err)
}
n.DAG = merkledag.NewDAGService(n.Blocks)
n.Pinning, err = pin.LoadPinner(n.Datastore, n.DAG)
if err != nil {
n.Pinning = pin.NewPinner(n.Datastore, n.DAG)
}
n.Resolver = &path.Resolver{DAG: n.DAG}
success = true
return n, nil
}
}
func (n *IpfsNode) StartOnlineServices() error {
......@@ -150,18 +203,22 @@ func (n *IpfsNode) StartOnlineServices() error {
return err
}
if err := n.startNetwork(); err != nil {
return err
peerhost, err := constructPeerHost(ctx, n.ContextGroup, n.Config, n.Identity, n.Peerstore)
if err != nil {
return debugerror.Wrap(err)
}
n.PeerHost = peerhost
// setup diagnostics service
n.Diagnostics = diag.NewDiagnostics(n.Identity, n.PeerHost)
// setup routing service
dhtRouting := dht.NewDHT(ctx, n.PeerHost, n.Datastore)
dhtRouting.Validators[IpnsValidatorTag] = namesys.ValidateIpnsRecord
dhtRouting, err := constructDHTRouting(ctx, n.ContextGroup, n.PeerHost, n.Datastore)
if err != nil {
return debugerror.Wrap(err)
}
n.DHT = dhtRouting
n.Routing = dhtRouting
n.AddChildGroup(dhtRouting)
// setup exchange service
const alwaysSendToPeer = true // use YesManStrategy
......@@ -178,35 +235,17 @@ func (n *IpfsNode) StartOnlineServices() error {
// an Exchange, Network, or Routing component and have the constructor
// manage the wiring. In that scenario, this dangling function is a bit
// awkward.
go superviseConnections(ctx, n.PeerHost, dhtRouting, n.Peerstore, n.Config.Bootstrap)
return nil
}
func (n *IpfsNode) startNetwork() error {
ctx := n.Context()
// setup the network
listenAddrs, err := listenAddresses(n.Config)
var bootstrapPeers []peer.PeerInfo
for _, bootstrap := range n.Config.Bootstrap {
p, err := toPeer(bootstrap)
if err != nil {
return debugerror.Wrap(err)
}
// make sure we dont error out if our config includes some addresses we cant use.
listenAddrs = swarm.FilterAddrs(listenAddrs)
network, err := swarm.NewNetwork(ctx, listenAddrs, n.Identity, n.Peerstore)
if err != nil {
return debugerror.Wrap(err)
log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err))
log.Errorf("%s bootstrap error: %s", n.Identity, err)
return err
}
n.AddChildGroup(network.CtxGroup())
n.PeerHost = p2pbhost.New(network)
// explicitly set these as our listen addrs.
// (why not do it inside inet.NewNetwork? because this way we can
// listen on addresses without necessarily advertising those publicly.)
addrs, err := n.PeerHost.Network().InterfaceListenAddresses()
if err != nil {
return debugerror.Wrap(err)
bootstrapPeers = append(bootstrapPeers, p)
}
n.Peerstore.AddAddresses(n.Identity, addrs)
go superviseConnections(ctx, n.PeerHost, n.DHT, n.Peerstore, bootstrapPeers)
return nil
}
......@@ -218,7 +257,27 @@ func (n *IpfsNode) teardown() error {
}
func (n *IpfsNode) OnlineMode() bool {
return n.onlineMode
switch n.mode {
case onlineMode:
return true
default:
return false
}
}
func (n *IpfsNode) Resolve(k util.Key) (*merkledag.Node, error) {
return (&path.Resolver{n.DAG}).ResolvePath(k.String())
}
// Bootstrap is undefined when node is not in OnlineMode
func (n *IpfsNode) Bootstrap(ctx context.Context, peers []peer.PeerInfo) error {
// TODO what should return value be when in offlineMode?
if n.DHT != nil {
return bootstrap(ctx, n.PeerHost, n.DHT, n.Peerstore, peers)
}
return nil
}
func (n *IpfsNode) loadID() error {
......@@ -289,3 +348,36 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
return listen, nil
}
// isolates the complex initialization steps
func constructPeerHost(ctx context.Context, ctxg ctxgroup.ContextGroup, cfg *config.Config, id peer.ID, ps peer.Peerstore) (p2phost.Host, error) {
listenAddrs, err := listenAddresses(cfg)
// make sure we dont error out if our config includes some addresses we cant use.
filteredAddrs := swarm.FilterAddrs(listenAddrs)
if err != nil {
return nil, debugerror.Wrap(err)
}
network, err := swarm.NewNetwork(ctx, filteredAddrs, id, ps)
if err != nil {
return nil, debugerror.Wrap(err)
}
ctxg.AddChildGroup(network.CtxGroup())
peerhost := p2pbhost.New(network)
// explicitly set these as our listen addrs.
// (why not do it inside inet.NewNetwork? because this way we can
// listen on addresses without necessarily advertising those publicly.)
addrs, err := peerhost.Network().InterfaceListenAddresses()
if err != nil {
return nil, debugerror.Wrap(err)
}
ps.AddAddresses(id, addrs)
return peerhost, nil
}
func constructDHTRouting(ctx context.Context, ctxg ctxgroup.ContextGroup, host p2phost.Host, ds datastore.ThreadSafeDatastore) (*dht.IpfsDHT, error) {
dhtRouting := dht.NewDHT(ctx, host, ds)
dhtRouting.Validators[IpnsValidatorTag] = namesys.ValidateIpnsRecord
ctxg.AddChildGroup(dhtRouting)
return dhtRouting, nil
}
......@@ -45,14 +45,14 @@ func TestInitialization(t *testing.T) {
}
for i, c := range good {
n, err := NewIpfsNode(ctx, c, false)
n, err := NewIPFSNode(ctx, Standard(c, false))
if n == nil || err != nil {
t.Error("Should have constructed.", i, err)
}
}
for i, c := range bad {
n, err := NewIpfsNode(ctx, c, false)
n, err := NewIPFSNode(ctx, Standard(c, false))
if n != nil || err == nil {
t.Error("Should have failed to construct.", i)
}
......
package core_io
// TODO rename package to something that doesn't conflict with io/ioutil.
// Pretty names are hard to find.
//
// Candidates:
//
// go-ipfs/core/unix
// go-ipfs/core/io
// go-ipfs/core/ioutil
// go-ipfs/core/coreio
// go-ipfs/core/coreunix
import (
"io"
core "github.com/jbenet/go-ipfs/core"
importer "github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
u "github.com/jbenet/go-ipfs/util"
)
func Add(n *core.IpfsNode, r io.Reader) (u.Key, error) {
// TODO more attractive function signature importer.BuildDagFromReader
dagNode, err := importer.BuildDagFromReader(
r,
n.DAG,
nil,
chunk.DefaultSplitter,
)
if err != nil {
return "", err
}
return dagNode.Key()
}
package core_io
// TODO rename package to something that doesn't conflict with io/ioutil.
// Pretty names are hard to find.
//
// Candidates:
//
// go-ipfs/core/unix
// go-ipfs/core/io
// go-ipfs/core/ioutil
// go-ipfs/core/coreio
// go-ipfs/core/coreunix
import (
"io"
core "github.com/jbenet/go-ipfs/core"
uio "github.com/jbenet/go-ipfs/unixfs/io"
u "github.com/jbenet/go-ipfs/util"
)
func Cat(n *core.IpfsNode, k u.Key) (io.Reader, error) {
dagNode, err := n.Resolve(k)
if err != nil {
return nil, err
}
return uio.NewDagReader(dagNode, n.DAG)
}
......@@ -11,14 +11,18 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
random "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random"
"github.com/jbenet/go-ipfs/core"
core_io "github.com/jbenet/go-ipfs/core/io"
mocknet "github.com/jbenet/go-ipfs/p2p/net/mock"
"github.com/jbenet/go-ipfs/p2p/peer"
errors "github.com/jbenet/go-ipfs/util/debugerror"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
const kSeed = 1
func Test1KBInstantaneous(t *testing.T) {
conf := Config{
conf := testutil.LatencyConfig{
NetworkLatency: 0,
RoutingLatency: 0,
BlockstoreLatency: 0,
......@@ -31,7 +35,7 @@ func Test1KBInstantaneous(t *testing.T) {
func TestDegenerateSlowBlockstore(t *testing.T) {
SkipUnlessEpic(t)
conf := Config{BlockstoreLatency: 50 * time.Millisecond}
conf := testutil.LatencyConfig{BlockstoreLatency: 50 * time.Millisecond}
if err := AddCatPowers(conf, 128); err != nil {
t.Fatal(err)
}
......@@ -39,7 +43,7 @@ func TestDegenerateSlowBlockstore(t *testing.T) {
func TestDegenerateSlowNetwork(t *testing.T) {
SkipUnlessEpic(t)
conf := Config{NetworkLatency: 400 * time.Millisecond}
conf := testutil.LatencyConfig{NetworkLatency: 400 * time.Millisecond}
if err := AddCatPowers(conf, 128); err != nil {
t.Fatal(err)
}
......@@ -47,7 +51,7 @@ func TestDegenerateSlowNetwork(t *testing.T) {
func TestDegenerateSlowRouting(t *testing.T) {
SkipUnlessEpic(t)
conf := Config{RoutingLatency: 400 * time.Millisecond}
conf := testutil.LatencyConfig{RoutingLatency: 400 * time.Millisecond}
if err := AddCatPowers(conf, 128); err != nil {
t.Fatal(err)
}
......@@ -55,13 +59,13 @@ func TestDegenerateSlowRouting(t *testing.T) {
func Test100MBMacbookCoastToCoast(t *testing.T) {
SkipUnlessEpic(t)
conf := Config{}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow()
conf := testutil.LatencyConfig{}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow()
if err := DirectAddCat(RandomBytes(100*1024*1024), conf); err != nil {
t.Fatal(err)
}
}
func AddCatPowers(conf Config, megabytesMax int64) error {
func AddCatPowers(conf testutil.LatencyConfig, megabytesMax int64) error {
var i int64
for i = 1; i < megabytesMax; i = i * 2 {
fmt.Printf("%d MB\n", i)
......@@ -78,7 +82,7 @@ func RandomBytes(n int64) []byte {
return data.Bytes()
}
func DirectAddCat(data []byte, conf Config) error {
func DirectAddCat(data []byte, conf testutil.LatencyConfig) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const numPeers = 2
......@@ -99,24 +103,24 @@ func DirectAddCat(data []byte, conf Config) error {
return errors.New("test initialization error")
}
adder, err := makeCore(ctx, MocknetTestRepo(peers[0], mn.Host(peers[0]), conf))
adder, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[0], mn.Host(peers[0]), conf)))
if err != nil {
return err
}
catter, err := makeCore(ctx, MocknetTestRepo(peers[1], mn.Host(peers[1]), conf))
catter, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[1], mn.Host(peers[1]), conf)))
if err != nil {
return err
}
adder.Bootstrap(ctx, catter.ID())
catter.Bootstrap(ctx, adder.ID())
catter.Bootstrap(ctx, []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)})
adder.Bootstrap(ctx, []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)})
keyAdded, err := adder.Add(bytes.NewReader(data))
keyAdded, err := core_io.Add(adder, bytes.NewReader(data))
if err != nil {
return err
}
readerCatted, err := catter.Cat(keyAdded)
readerCatted, err := core_io.Cat(catter, keyAdded)
if err != nil {
return err
}
......
package epictest
import "testing"
import (
"testing"
func benchmarkAddCat(numBytes int64, conf Config, b *testing.B) {
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
func benchmarkAddCat(numBytes int64, conf testutil.LatencyConfig, b *testing.B) {
b.StopTimer()
b.SetBytes(numBytes)
......@@ -16,7 +20,7 @@ func benchmarkAddCat(numBytes int64, conf Config, b *testing.B) {
}
}
var instant = Config{}.All_Instantaneous()
var instant = testutil.LatencyConfig{}.All_Instantaneous()
func BenchmarkInstantaneousAddCat1KB(b *testing.B) { benchmarkAddCat(1*KB, instant, b) }
func BenchmarkInstantaneousAddCat1MB(b *testing.B) { benchmarkAddCat(1*MB, instant, b) }
......@@ -29,7 +33,7 @@ func BenchmarkInstantaneousAddCat64MB(b *testing.B) { benchmarkAddCat(64*MB, in
func BenchmarkInstantaneousAddCat128MB(b *testing.B) { benchmarkAddCat(128*MB, instant, b) }
func BenchmarkInstantaneousAddCat256MB(b *testing.B) { benchmarkAddCat(256*MB, instant, b) }
var routing = Config{}.Routing_Slow()
var routing = testutil.LatencyConfig{}.Routing_Slow()
func BenchmarkRoutingSlowAddCat1MB(b *testing.B) { benchmarkAddCat(1*MB, routing, b) }
func BenchmarkRoutingSlowAddCat2MB(b *testing.B) { benchmarkAddCat(2*MB, routing, b) }
......@@ -42,7 +46,7 @@ func BenchmarkRoutingSlowAddCat128MB(b *testing.B) { benchmarkAddCat(128*MB, rou
func BenchmarkRoutingSlowAddCat256MB(b *testing.B) { benchmarkAddCat(256*MB, routing, b) }
func BenchmarkRoutingSlowAddCat512MB(b *testing.B) { benchmarkAddCat(512*MB, routing, b) }
var network = Config{}.Network_NYtoSF()
var network = testutil.LatencyConfig{}.Network_NYtoSF()
func BenchmarkNetworkSlowAddCat1MB(b *testing.B) { benchmarkAddCat(1*MB, network, b) }
func BenchmarkNetworkSlowAddCat2MB(b *testing.B) { benchmarkAddCat(2*MB, network, b) }
......@@ -54,7 +58,7 @@ func BenchmarkNetworkSlowAddCat64MB(b *testing.B) { benchmarkAddCat(64*MB, netw
func BenchmarkNetworkSlowAddCat128MB(b *testing.B) { benchmarkAddCat(128*MB, network, b) }
func BenchmarkNetworkSlowAddCat256MB(b *testing.B) { benchmarkAddCat(256*MB, network, b) }
var hdd = Config{}.Blockstore_7200RPM()
var hdd = testutil.LatencyConfig{}.Blockstore_7200RPM()
func BenchmarkBlockstoreSlowAddCat1MB(b *testing.B) { benchmarkAddCat(1*MB, hdd, b) }
func BenchmarkBlockstoreSlowAddCat2MB(b *testing.B) { benchmarkAddCat(2*MB, hdd, b) }
......@@ -66,7 +70,7 @@ func BenchmarkBlockstoreSlowAddCat64MB(b *testing.B) { benchmarkAddCat(64*MB, h
func BenchmarkBlockstoreSlowAddCat128MB(b *testing.B) { benchmarkAddCat(128*MB, hdd, b) }
func BenchmarkBlockstoreSlowAddCat256MB(b *testing.B) { benchmarkAddCat(256*MB, hdd, b) }
var mixed = Config{}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow()
var mixed = testutil.LatencyConfig{}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow()
func BenchmarkMixedAddCat1MBXX(b *testing.B) { benchmarkAddCat(1*MB, mixed, b) }
func BenchmarkMixedAddCat2MBXX(b *testing.B) { benchmarkAddCat(2*MB, mixed, b) }
......
package epictest
import (
"io"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
blockservice "github.com/jbenet/go-ipfs/blockservice"
exchange "github.com/jbenet/go-ipfs/exchange"
core "github.com/jbenet/go-ipfs/core"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
importer "github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
merkledag "github.com/jbenet/go-ipfs/merkledag"
host "github.com/jbenet/go-ipfs/p2p/host"
peer "github.com/jbenet/go-ipfs/p2p/peer"
path "github.com/jbenet/go-ipfs/path"
dht "github.com/jbenet/go-ipfs/routing/dht"
uio "github.com/jbenet/go-ipfs/unixfs/io"
util "github.com/jbenet/go-ipfs/util"
"github.com/jbenet/go-ipfs/util/datastore2"
delay "github.com/jbenet/go-ipfs/util/delay"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
var log = eventlog.Logger("epictest")
// TODO merge with core.IpfsNode
type core struct {
repo Repo
blockService *blockservice.BlockService
blockstore blockstore.Blockstore
dag merkledag.DAGService
id peer.ID
}
func (c *core) ID() peer.ID {
return c.repo.ID()
}
func (c *core) Bootstrap(ctx context.Context, p peer.ID) error {
return c.repo.Bootstrap(ctx, p)
}
func (c *core) Cat(k util.Key) (io.Reader, error) {
catterdag := c.dag
nodeCatted, err := (&path.Resolver{catterdag}).ResolvePath(k.String())
if err != nil {
return nil, err
}
return uio.NewDagReader(nodeCatted, catterdag)
}
func (c *core) Add(r io.Reader) (util.Key, error) {
nodeAdded, err := importer.BuildDagFromReader(
r,
c.dag,
nil,
chunk.DefaultSplitter,
)
if err != nil {
return "", err
}
return nodeAdded.Key()
}
func makeCore(ctx context.Context, rf RepoFactory) (*core, error) {
repo, err := rf(ctx)
if err != nil {
return nil, err
}
bss, err := blockservice.New(repo.Blockstore(), repo.Exchange())
if err != nil {
return nil, err
}
dag := merkledag.NewDAGService(bss)
// to make sure nothing is omitted, init each individual field and assign
// all at once at the bottom.
return &core{
repo: repo,
blockService: bss,
dag: dag,
}, nil
}
type RepoFactory func(ctx context.Context) (Repo, error)
type Repo interface {
ID() peer.ID
Blockstore() blockstore.Blockstore
Exchange() exchange.Interface
Bootstrap(ctx context.Context, peer peer.ID) error
}
type repo struct {
// DHT, Exchange, Network,Datastore
bitSwapNetwork bsnet.BitSwapNetwork
blockstore blockstore.Blockstore
exchange exchange.Interface
datastore datastore.ThreadSafeDatastore
host host.Host
dht *dht.IpfsDHT
id peer.ID
}
func (r *repo) ID() peer.ID {
return r.id
}
func (c *repo) Bootstrap(ctx context.Context, p peer.ID) error {
return c.dht.Connect(ctx, p)
}
func (r *repo) Datastore() datastore.ThreadSafeDatastore {
return r.datastore
}
func (r *repo) Blockstore() blockstore.Blockstore {
return r.blockstore
}
func (r *repo) Exchange() exchange.Interface {
return r.exchange
}
func MocknetTestRepo(p peer.ID, h host.Host, conf Config) RepoFactory {
return func(ctx context.Context) (Repo, error) {
func MocknetTestRepo(p peer.ID, h host.Host, conf testutil.LatencyConfig) core.ConfigOption {
return func(ctx context.Context) (*core.IpfsNode, error) {
const kWriteCacheElems = 100
const alwaysSendToPeer = true
dsDelay := delay.Fixed(conf.BlockstoreLatency)
ds := sync.MutexWrap(datastore2.WithDelay(datastore.NewMapDatastore(), dsDelay))
ds := datastore2.CloserWrap(sync.MutexWrap(datastore2.WithDelay(datastore.NewMapDatastore(), dsDelay)))
log.Debugf("MocknetTestRepo: %s %s %s", p, h.ID(), h)
dhtt := dht.NewDHT(ctx, h, ds)
......@@ -145,14 +35,15 @@ func MocknetTestRepo(p peer.ID, h host.Host, conf Config) RepoFactory {
return nil, err
}
exch := bitswap.New(ctx, p, bsn, bstore, alwaysSendToPeer)
return &repo{
bitSwapNetwork: bsn,
blockstore: bstore,
exchange: exch,
datastore: ds,
host: h,
dht: dhtt,
id: p,
return &core.IpfsNode{
Peerstore: h.Peerstore(),
Blockstore: bstore,
Exchange: exch,
Datastore: ds,
PeerHost: h,
Routing: dhtt,
Identity: p,
DHT: dhtt,
}, nil
}
}
package epictest
import "time"
type Config struct {
BlockstoreLatency time.Duration
NetworkLatency time.Duration
RoutingLatency time.Duration
}
func (c Config) All_Instantaneous() Config {
// Could use a zero value but whatever. Consistency of interface
c.NetworkLatency = 0
c.RoutingLatency = 0
c.BlockstoreLatency = 0
return c
}
func (c Config) Network_NYtoSF() Config {
c.NetworkLatency = 20 * time.Millisecond
return c
}
func (c Config) Network_IntraDatacenter2014() Config {
c.NetworkLatency = 250 * time.Microsecond
return c
}
func (c Config) Blockstore_FastSSD2014() Config {
const iops = 100000
c.BlockstoreLatency = (1 / iops) * time.Second
return c
}
func (c Config) Blockstore_SlowSSD2014() Config {
c.BlockstoreLatency = 150 * time.Microsecond
return c
}
func (c Config) Blockstore_7200RPM() Config {
c.BlockstoreLatency = 8 * time.Millisecond
return c
}
func (c Config) Routing_Slow() Config {
c.RoutingLatency = 200 * time.Millisecond
return c
}
......@@ -7,12 +7,16 @@ import (
"testing"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
core "github.com/jbenet/go-ipfs/core"
core_io "github.com/jbenet/go-ipfs/core/io"
mocknet "github.com/jbenet/go-ipfs/p2p/net/mock"
"github.com/jbenet/go-ipfs/p2p/peer"
errors "github.com/jbenet/go-ipfs/util/debugerror"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
func TestThreeLeggedCat(t *testing.T) {
conf := Config{
conf := testutil.LatencyConfig{
NetworkLatency: 0,
RoutingLatency: 0,
BlockstoreLatency: 0,
......@@ -22,7 +26,7 @@ func TestThreeLeggedCat(t *testing.T) {
}
}
func RunThreeLeggedCat(data []byte, conf Config) error {
func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const numPeers = 3
......@@ -42,28 +46,28 @@ func RunThreeLeggedCat(data []byte, conf Config) error {
if len(peers) < numPeers {
return errors.New("test initialization error")
}
adder, err := makeCore(ctx, MocknetTestRepo(peers[0], mn.Host(peers[0]), conf))
adder, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[0], mn.Host(peers[0]), conf)))
if err != nil {
return err
}
catter, err := makeCore(ctx, MocknetTestRepo(peers[1], mn.Host(peers[1]), conf))
catter, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[1], mn.Host(peers[1]), conf)))
if err != nil {
return err
}
bootstrap, err := makeCore(ctx, MocknetTestRepo(peers[2], mn.Host(peers[2]), conf))
bootstrap, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[2], mn.Host(peers[2]), conf)))
if err != nil {
return err
}
boostrapInfo := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID())
adder.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo})
catter.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo})
adder.Bootstrap(ctx, bootstrap.ID())
catter.Bootstrap(ctx, bootstrap.ID())
keyAdded, err := adder.Add(bytes.NewReader(data))
keyAdded, err := core_io.Add(adder, bytes.NewReader(data))
if err != nil {
return err
}
readerCatted, err := catter.Cat(keyAdded)
readerCatted, err := core_io.Cat(catter, keyAdded)
if err != nil {
return err
}
......
package testutil
import "time"
type LatencyConfig struct {
BlockstoreLatency time.Duration
NetworkLatency time.Duration
RoutingLatency time.Duration
}
func (c LatencyConfig) All_Instantaneous() LatencyConfig {
// Could use a zero value but whatever. Consistency of interface
c.NetworkLatency = 0
c.RoutingLatency = 0
c.BlockstoreLatency = 0
return c
}
func (c LatencyConfig) Network_NYtoSF() LatencyConfig {
c.NetworkLatency = 20 * time.Millisecond
return c
}
func (c LatencyConfig) Network_IntraDatacenter2014() LatencyConfig {
c.NetworkLatency = 250 * time.Microsecond
return c
}
func (c LatencyConfig) Blockstore_FastSSD2014() LatencyConfig {
const iops = 100000
c.BlockstoreLatency = (1 / iops) * time.Second
return c
}
func (c LatencyConfig) Blockstore_SlowSSD2014() LatencyConfig {
c.BlockstoreLatency = 150 * time.Microsecond
return c
}
func (c LatencyConfig) Blockstore_7200RPM() LatencyConfig {
c.BlockstoreLatency = 8 * time.Millisecond
return c
}
func (c LatencyConfig) Routing_Slow() LatencyConfig {
c.RoutingLatency = 200 * time.Millisecond
return c
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment