Commit abd971d3 authored by Brian Tiger Chow's avatar Brian Tiger Chow

extract initialization

refactor(epictest) Core

refactor: extract repo

fix

move core
parent 217611c2
...@@ -10,27 +10,9 @@ import ( ...@@ -10,27 +10,9 @@ import (
"time" "time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
random "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random" random "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
blockservice "github.com/jbenet/go-ipfs/blockservice"
exchange "github.com/jbenet/go-ipfs/exchange"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
importer "github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
merkledag "github.com/jbenet/go-ipfs/merkledag"
net "github.com/jbenet/go-ipfs/net"
mocknet "github.com/jbenet/go-ipfs/net/mock" mocknet "github.com/jbenet/go-ipfs/net/mock"
path "github.com/jbenet/go-ipfs/path"
peer "github.com/jbenet/go-ipfs/peer"
dht "github.com/jbenet/go-ipfs/routing/dht"
uio "github.com/jbenet/go-ipfs/unixfs/io"
util "github.com/jbenet/go-ipfs/util"
"github.com/jbenet/go-ipfs/util/datastore2"
errors "github.com/jbenet/go-ipfs/util/debugerror" errors "github.com/jbenet/go-ipfs/util/debugerror"
delay "github.com/jbenet/go-ipfs/util/delay"
) )
const kSeed = 1 const kSeed = 1
...@@ -42,7 +24,7 @@ func Test1KBInstantaneous(t *testing.T) { ...@@ -42,7 +24,7 @@ func Test1KBInstantaneous(t *testing.T) {
BlockstoreLatency: 0, BlockstoreLatency: 0,
} }
if err := AddCatBytes(RandomBytes(100*MB), conf); err != nil { if err := AddCatBytes(RandomBytes(1*KB), conf); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
...@@ -96,23 +78,10 @@ func RandomBytes(n int64) []byte { ...@@ -96,23 +78,10 @@ func RandomBytes(n int64) []byte {
return data.Bytes() return data.Bytes()
} }
type instance struct {
ID peer.ID
Network net.Network
Blockstore blockstore.Blockstore
Datastore datastore.ThreadSafeDatastore
DHT *dht.IpfsDHT
Exchange exchange.Interface
BitSwapNetwork bsnet.BitSwapNetwork
datastoreDelay delay.D
}
func AddCatBytes(data []byte, conf Config) error { func AddCatBytes(data []byte, conf Config) error {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
const numPeers = 2 const numPeers = 2
instances := make(map[peer.ID]*instance, numPeers)
// create network // create network
mn, err := mocknet.FullMeshLinked(ctx, numPeers) mn, err := mocknet.FullMeshLinked(ctx, numPeers)
...@@ -124,57 +93,28 @@ func AddCatBytes(data []byte, conf Config) error { ...@@ -124,57 +93,28 @@ func AddCatBytes(data []byte, conf Config) error {
// TODO add to conf. This is tricky because we want 0 values to be functional. // TODO add to conf. This is tricky because we want 0 values to be functional.
Bandwidth: math.MaxInt32, Bandwidth: math.MaxInt32,
}) })
for _, p := range mn.Peers() {
instances[p] = &instance{
ID: p,
Network: mn.Net(p),
}
}
// create dht network if len(mn.Peers()) < numPeers {
for _, p := range mn.Peers() { return errors.New("test initialization error")
dsDelay := delay.Fixed(conf.BlockstoreLatency)
instances[p].Datastore = sync.MutexWrap(datastore2.WithDelay(datastore.NewMapDatastore(), dsDelay))
instances[p].datastoreDelay = dsDelay
}
for _, p := range mn.Peers() {
instances[p].DHT = dht.NewDHT(ctx, p, instances[p].Network, instances[p].Datastore)
} }
// create two bitswap network clients adder, err := makeCore(ctx, MocknetTestRepo(mn.Peers()[0], mn.Net(mn.Peers()[0]), conf))
for _, p := range mn.Peers() { if err != nil {
instances[p].BitSwapNetwork = bsnet.NewFromIpfsNetwork(instances[p].Network, instances[p].DHT) return err
}
for _, p := range mn.Peers() {
const kWriteCacheElems = 100
const alwaysSendToPeer = true
adapter := instances[p].BitSwapNetwork
dstore := instances[p].Datastore
instances[p].Blockstore, err = blockstore.WriteCached(blockstore.NewBlockstore(dstore), kWriteCacheElems)
if err != nil {
return err
}
instances[p].Exchange = bitswap.New(ctx, p, adapter, instances[p].Blockstore, alwaysSendToPeer)
} }
var peers []peer.ID catter, err := makeCore(ctx, MocknetTestRepo(mn.Peers()[1], mn.Net(mn.Peers()[1]), conf))
for _, p := range mn.Peers() { if err != nil {
peers = append(peers, p) return err
} }
adder := instances[peers[0]] adder.Bootstrap(ctx, catter.ID())
catter := instances[peers[1]] catter.Bootstrap(ctx, adder.ID())
// bootstrap the DHTs
adder.DHT.Connect(ctx, catter.ID)
catter.DHT.Connect(ctx, adder.ID)
adder.datastoreDelay.Set(0) // disable blockstore latency during add operation keyAdded, err := adder.Add(bytes.NewReader(data))
keyAdded, err := add(adder, bytes.NewReader(data))
if err != nil { if err != nil {
return err return err
} }
adder.datastoreDelay.Set(conf.BlockstoreLatency) // add some blockstore delay to make the catter wait
readerCatted, err := cat(catter, keyAdded) readerCatted, err := catter.Cat(keyAdded)
if err != nil { if err != nil {
return err return err
} }
...@@ -188,28 +128,6 @@ func AddCatBytes(data []byte, conf Config) error { ...@@ -188,28 +128,6 @@ func AddCatBytes(data []byte, conf Config) error {
return nil return nil
} }
func cat(catter *instance, k util.Key) (io.Reader, error) {
catterdag := merkledag.NewDAGService(&blockservice.BlockService{catter.Blockstore, catter.Exchange})
nodeCatted, err := (&path.Resolver{catterdag}).ResolvePath(k.String())
if err != nil {
return nil, err
}
return uio.NewDagReader(nodeCatted, catterdag)
}
func add(adder *instance, r io.Reader) (util.Key, error) {
nodeAdded, err := importer.BuildDagFromReader(
r,
merkledag.NewDAGService(&blockservice.BlockService{adder.Blockstore, adder.Exchange}),
nil,
chunk.DefaultSplitter,
)
if err != nil {
return "", err
}
return nodeAdded.Key()
}
func SkipUnlessEpic(t *testing.T) { func SkipUnlessEpic(t *testing.T) {
if os.Getenv("IPFS_EPIC_TEST") == "" { if os.Getenv("IPFS_EPIC_TEST") == "" {
t.SkipNow() t.SkipNow()
......
package epictest
import (
"io"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
blockservice "github.com/jbenet/go-ipfs/blockservice"
exchange "github.com/jbenet/go-ipfs/exchange"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
importer "github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
merkledag "github.com/jbenet/go-ipfs/merkledag"
net "github.com/jbenet/go-ipfs/net"
path "github.com/jbenet/go-ipfs/path"
peer "github.com/jbenet/go-ipfs/peer"
dht "github.com/jbenet/go-ipfs/routing/dht"
uio "github.com/jbenet/go-ipfs/unixfs/io"
util "github.com/jbenet/go-ipfs/util"
"github.com/jbenet/go-ipfs/util/datastore2"
delay "github.com/jbenet/go-ipfs/util/delay"
)
// TODO merge with core.IpfsNode
type core struct {
repo Repo
blockService *blockservice.BlockService
blockstore blockstore.Blockstore
dag merkledag.DAGService
id peer.ID
}
func (c *core) ID() peer.ID {
return c.repo.ID()
}
func (c *core) Bootstrap(ctx context.Context, p peer.ID) error {
return c.repo.Bootstrap(ctx, p)
}
func (c *core) Cat(k util.Key) (io.Reader, error) {
catterdag := c.dag
nodeCatted, err := (&path.Resolver{catterdag}).ResolvePath(k.String())
if err != nil {
return nil, err
}
return uio.NewDagReader(nodeCatted, catterdag)
}
func (c *core) Add(r io.Reader) (util.Key, error) {
nodeAdded, err := importer.BuildDagFromReader(
r,
c.dag,
nil,
chunk.DefaultSplitter,
)
if err != nil {
return "", err
}
return nodeAdded.Key()
}
func makeCore(ctx context.Context, rf RepoFactory) (*core, error) {
repo, err := rf(ctx)
if err != nil {
return nil, err
}
bss := &blockservice.BlockService{repo.Blockstore(), repo.Exchange()}
dag := merkledag.NewDAGService(bss)
// to make sure nothing is omitted, init each individual field and assign
// all at once at the bottom.
return &core{
repo: repo,
blockService: bss,
dag: dag,
}, nil
}
type RepoFactory func(ctx context.Context) (Repo, error)
type Repo interface {
ID() peer.ID
Blockstore() blockstore.Blockstore
Exchange() exchange.Interface
Bootstrap(ctx context.Context, peer peer.ID) error
}
type repo struct {
// DHT, Exchange, Network,Datastore
bitSwapNetwork bsnet.BitSwapNetwork
blockstore blockstore.Blockstore
exchange exchange.Interface
datastore datastore.ThreadSafeDatastore
network net.Network
dht *dht.IpfsDHT
id peer.ID
}
func (r *repo) ID() peer.ID {
return r.id
}
func (c *repo) Bootstrap(ctx context.Context, p peer.ID) error {
return c.dht.Connect(ctx, p)
}
func (r *repo) Datastore() datastore.ThreadSafeDatastore {
return r.datastore
}
func (r *repo) Blockstore() blockstore.Blockstore {
return r.blockstore
}
func (r *repo) Exchange() exchange.Interface {
return r.exchange
}
func MocknetTestRepo(p peer.ID, n net.Network, conf Config) RepoFactory {
return func(ctx context.Context) (Repo, error) {
const kWriteCacheElems = 100
const alwaysSendToPeer = true
dsDelay := delay.Fixed(conf.BlockstoreLatency)
ds := sync.MutexWrap(datastore2.WithDelay(datastore.NewMapDatastore(), dsDelay))
dhtt := dht.NewDHT(ctx, p, n, ds)
bsn := bsnet.NewFromIpfsNetwork(n, dhtt)
bstore, err := blockstore.WriteCached(blockstore.NewBlockstore(ds), kWriteCacheElems)
if err != nil {
return nil, err
}
exch := bitswap.New(ctx, p, bsn, bstore, alwaysSendToPeer)
return &repo{
bitSwapNetwork: bsn,
blockstore: bstore,
exchange: exch,
datastore: ds,
network: n,
dht: dhtt,
id: p,
}, nil
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment