Commit c70e717c authored by Brian Tiger Chow's avatar Brian Tiger Chow

Merge pull request #578 from jbenet/feat/core+repo

feat(core) replace config.Config with repo.Repo
parents fce04f44 797aad91
......@@ -10,8 +10,7 @@ import (
cmds "github.com/jbenet/go-ipfs/commands"
core "github.com/jbenet/go-ipfs/core"
corecmds "github.com/jbenet/go-ipfs/core/commands"
imp "github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
coreunix "github.com/jbenet/go-ipfs/core/coreunix"
ci "github.com/jbenet/go-ipfs/p2p/crypto"
peer "github.com/jbenet/go-ipfs/p2p/peer"
repo "github.com/jbenet/go-ipfs/repo"
......@@ -106,7 +105,7 @@ func doInit(repoRoot string, force bool, nBitsForKeypair int) (interface{}, erro
if err := repo.ConfigureEventLogger(conf.Logs); err != nil {
return nil, err
}
err = addTheWelcomeFile(conf)
err = addTheWelcomeFile(repoRoot)
if err != nil {
return nil, err
}
......@@ -115,26 +114,23 @@ func doInit(repoRoot string, force bool, nBitsForKeypair int) (interface{}, erro
}
// addTheWelcomeFile adds a file containing the welcome message to the newly
// minted node. On success, it calls onSuccess
func addTheWelcomeFile(conf *config.Config) error {
// TODO extract this file creation operation into a function
// minted node.
func addTheWelcomeFile(repoRoot string) error {
ctx, cancel := context.WithCancel(context.Background())
nd, err := core.NewIPFSNode(ctx, core.Offline(conf))
defer cancel()
r := fsrepo.At(repoRoot)
if err := r.Open(); err != nil { // NB: repo is owned by the node
return err
}
nd, err := core.NewIPFSNode(ctx, core.Offline(r))
if err != nil {
return err
}
defer nd.Close()
defer cancel()
// Set up default file
reader := bytes.NewBufferString(welcomeMsg)
defnd, err := imp.BuildDagFromReader(reader, nd.DAG, nd.Pinning.GetManual(), chunk.DefaultSplitter)
if err != nil {
return err
}
k, err := defnd.Key()
k, err := coreunix.Add(nd, reader)
if err != nil {
return fmt.Errorf("failed to write test file: %s", err)
}
......
......@@ -181,15 +181,19 @@ func (i *cmdInvocation) constructNodeFunc(ctx context.Context) func() (*core.Ipf
return nil, errors.New("constructing node without a request context")
}
cfg, err := cmdctx.GetConfig()
if err != nil {
return nil, fmt.Errorf("constructing node without a config: %s", err)
r := fsrepo.At(i.req.Context().ConfigRoot)
if err := r.Open(); err != nil { // repo is owned by the node
return nil, err
}
// ok everything is good. set it on the invocation (for ownership)
// and return it.
i.node, err = core.NewIPFSNode(ctx, core.Standard(cfg, cmdctx.Online))
return i.node, err
n, err := core.NewIPFSNode(ctx, core.Standard(r, cmdctx.Online))
if err != nil {
return nil, err
}
i.node = n
return i.node, nil
}
}
......
......@@ -2,6 +2,7 @@ package core
import (
"fmt"
"io"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
......@@ -29,11 +30,11 @@ import (
peer "github.com/jbenet/go-ipfs/p2p/peer"
path "github.com/jbenet/go-ipfs/path"
pin "github.com/jbenet/go-ipfs/pin"
repo "github.com/jbenet/go-ipfs/repo"
config "github.com/jbenet/go-ipfs/repo/config"
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
util "github.com/jbenet/go-ipfs/util"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
......@@ -41,6 +42,7 @@ import (
const IpnsValidatorTag = "ipns"
const kSizeBlockstoreWriteCache = 100
const kReprovideFrequency = time.Hour * 12
var log = eventlog.Logger("core")
......@@ -59,9 +61,7 @@ type IpfsNode struct {
// Self
Identity peer.ID // the local node's identity
// TODO abstract as repo.Repo
Config *config.Config // the node's configuration
Datastore ds2.ThreadSafeDatastoreCloser // the local datastore
Repo repo.Repo
// Local node
Pinning pin.Pinner // the pinning manager
......@@ -85,10 +85,6 @@ type IpfsNode struct {
ctxgroup.ContextGroup
// dht allows node to Bootstrap when dht is present
// TODO privatize before merging. This is here temporarily during the
// migration of the TestNet constructor
DHT *dht.IpfsDHT
mode mode
}
......@@ -102,11 +98,22 @@ type Mounts struct {
type ConfigOption func(ctx context.Context) (*IpfsNode, error)
func NewIPFSNode(ctx context.Context, option ConfigOption) (*IpfsNode, error) {
func NewIPFSNode(parent context.Context, option ConfigOption) (*IpfsNode, error) {
ctxg := ctxgroup.WithContext(parent)
ctx := ctxg.Context()
success := false // flip to true after all sub-system inits succeed
defer func() {
if !success {
ctxg.Close()
}
}()
node, err := option(ctx)
if err != nil {
return nil, err
}
node.ContextGroup = ctxg
ctxg.SetTeardown(node.teardown)
// Need to make sure it's perfectly clear 1) which variables are expected
// to be initialized at this point, and 2) which variables will be
......@@ -120,35 +127,41 @@ func NewIPFSNode(ctx context.Context, option ConfigOption) (*IpfsNode, error) {
node.Peerstore = peer.NewPeerstore()
}
node.DAG = merkledag.NewDAGService(node.Blocks)
node.Pinning, err = pin.LoadPinner(node.Datastore, node.DAG)
node.Pinning, err = pin.LoadPinner(node.Repo.Datastore(), node.DAG)
if err != nil {
node.Pinning = pin.NewPinner(node.Datastore, node.DAG)
node.Pinning = pin.NewPinner(node.Repo.Datastore(), node.DAG)
}
node.Resolver = &path.Resolver{DAG: node.DAG}
success = true
return node, nil
}
func Offline(cfg *config.Config) ConfigOption {
return Standard(cfg, false)
func Offline(r repo.Repo) ConfigOption {
return Standard(r, false)
}
func Online(cfg *config.Config) ConfigOption {
return Standard(cfg, true)
func Online(r repo.Repo) ConfigOption {
return Standard(r, true)
}
// DEPRECATED: use Online, Offline functions
func Standard(cfg *config.Config, online bool) ConfigOption {
func Standard(r repo.Repo, online bool) ConfigOption {
return func(ctx context.Context) (n *IpfsNode, err error) {
success := false // flip to true after all sub-system inits succeed
// FIXME perform node construction in the main constructor so it isn't
// necessary to perform this teardown in this scope.
success := false
defer func() {
if !success && n != nil {
n.Close()
n.teardown()
}
}()
if cfg == nil {
return nil, debugerror.Errorf("configuration required")
// TODO move as much of node initialization as possible into
// NewIPFSNode. The larger these config options are, the harder it is
// to test all node construction code paths.
if r == nil {
return nil, debugerror.Errorf("repo required")
}
n = &IpfsNode{
mode: func() mode {
......@@ -157,34 +170,25 @@ func Standard(cfg *config.Config, online bool) ConfigOption {
}
return offlineMode
}(),
Config: cfg,
Repo: r,
}
n.ContextGroup = ctxgroup.WithContextAndTeardown(ctx, n.teardown)
ctx = n.ContextGroup.Context()
// setup Peerstore
n.Peerstore = peer.NewPeerstore()
// setup datastore.
if n.Datastore, err = makeDatastore(cfg.Datastore); err != nil {
return nil, debugerror.Wrap(err)
}
// setup local peer ID (private key is loaded in online setup)
if err := n.loadID(); err != nil {
return nil, err
}
n.Blockstore, err = bstore.WriteCached(bstore.NewBlockstore(n.Datastore), kSizeBlockstoreWriteCache)
n.Blockstore, err = bstore.WriteCached(bstore.NewBlockstore(n.Repo.Datastore()), kSizeBlockstoreWriteCache)
if err != nil {
return nil, debugerror.Wrap(err)
}
// setup online services
if online {
if err := n.StartOnlineServices(); err != nil {
return nil, err // debugerror.Wraps.
if err := n.StartOnlineServices(ctx); err != nil {
return nil, err
}
} else {
n.Exchange = offline.Exchange(n.Blockstore)
......@@ -195,8 +199,7 @@ func Standard(cfg *config.Config, online bool) ConfigOption {
}
}
func (n *IpfsNode) StartOnlineServices() error {
ctx := n.Context()
func (n *IpfsNode) StartOnlineServices(ctx context.Context) error {
if n.PeerHost != nil { // already online.
return debugerror.New("node already online")
......@@ -207,7 +210,7 @@ func (n *IpfsNode) StartOnlineServices() error {
return err
}
peerhost, err := constructPeerHost(ctx, n.ContextGroup, n.Config, n.Identity, n.Peerstore)
peerhost, err := constructPeerHost(ctx, n.Repo.Config(), n.Identity, n.Peerstore)
if err != nil {
return debugerror.Wrap(err)
}
......@@ -217,11 +220,10 @@ func (n *IpfsNode) StartOnlineServices() error {
n.Diagnostics = diag.NewDiagnostics(n.Identity, n.PeerHost)
// setup routing service
dhtRouting, err := constructDHTRouting(ctx, n.ContextGroup, n.PeerHost, n.Datastore)
dhtRouting, err := constructDHTRouting(ctx, n.PeerHost, n.Repo.Datastore())
if err != nil {
return debugerror.Wrap(err)
}
n.DHT = dhtRouting
n.Routing = dhtRouting
// setup exchange service
......@@ -240,7 +242,7 @@ func (n *IpfsNode) StartOnlineServices() error {
// manage the wiring. In that scenario, this dangling function is a bit
// awkward.
var bootstrapPeers []peer.PeerInfo
for _, bootstrap := range n.Config.Bootstrap {
for _, bootstrap := range n.Repo.Config().Bootstrap {
p, err := toPeer(bootstrap)
if err != nil {
log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err))
......@@ -249,17 +251,40 @@ func (n *IpfsNode) StartOnlineServices() error {
}
bootstrapPeers = append(bootstrapPeers, p)
}
go superviseConnections(ctx, n.PeerHost, n.DHT, n.Peerstore, bootstrapPeers)
// Start up reprovider system
go superviseConnections(ctx, n.PeerHost, dhtRouting, n.Peerstore, bootstrapPeers)
n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore)
go n.Reprovider.ProvideEvery(ctx, time.Hour*12)
go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency)
return nil
}
// teardown closes owned children. If any errors occur, this function returns
// the first error.
func (n *IpfsNode) teardown() error {
if err := n.Datastore.Close(); err != nil {
return err
// owned objects are closed in this teardown to ensure that they're closed
// regardless of which constructor was used to add them to the node.
var closers []io.Closer
if n.Repo != nil {
closers = append(closers, n.Repo)
}
if n.Routing != nil {
if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
closers = append(closers, dht)
}
}
if n.PeerHost != nil {
closers = append(closers, n.PeerHost)
}
var errs []error
for _, closer := range closers {
if err := closer.Close(); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errs[0]
}
return nil
}
......@@ -273,6 +298,8 @@ func (n *IpfsNode) OnlineMode() bool {
}
}
// TODO expose way to resolve path name
func (n *IpfsNode) Resolve(k util.Key) (*merkledag.Node, error) {
return (&path.Resolver{n.DAG}).ResolvePath(k.String())
}
......@@ -282,8 +309,10 @@ func (n *IpfsNode) Bootstrap(ctx context.Context, peers []peer.PeerInfo) error {
// TODO what should return value be when in offlineMode?
if n.DHT != nil {
return bootstrap(ctx, n.PeerHost, n.DHT, n.Peerstore, peers)
if n.Routing != nil {
if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
return bootstrap(ctx, n.PeerHost, dht, n.Peerstore, peers)
}
}
return nil
}
......@@ -293,7 +322,7 @@ func (n *IpfsNode) loadID() error {
return debugerror.New("identity already loaded")
}
cid := n.Config.Identity.PeerID
cid := n.Repo.Config().Identity.PeerID
if cid == "" {
return debugerror.New("Identity was not set in config (was ipfs init run?)")
}
......@@ -314,7 +343,7 @@ func (n *IpfsNode) loadPrivateKey() error {
return debugerror.New("private key already loaded")
}
sk, err := loadPrivateKey(&n.Config.Identity, n.Identity)
sk, err := loadPrivateKey(&n.Repo.Config().Identity, n.Identity)
if err != nil {
return err
}
......@@ -358,7 +387,7 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
}
// isolates the complex initialization steps
func constructPeerHost(ctx context.Context, ctxg ctxgroup.ContextGroup, cfg *config.Config, id peer.ID, ps peer.Peerstore) (p2phost.Host, error) {
func constructPeerHost(ctx context.Context, cfg *config.Config, id peer.ID, ps peer.Peerstore) (p2phost.Host, error) {
listenAddrs, err := listenAddresses(cfg)
if err != nil {
return nil, debugerror.Wrap(err)
......@@ -376,7 +405,6 @@ func constructPeerHost(ctx context.Context, ctxg ctxgroup.ContextGroup, cfg *con
if err != nil {
return nil, debugerror.Wrap(err)
}
ctxg.AddChildGroup(network.CtxGroup())
peerhost := p2pbhost.New(network)
// explicitly set these as our listen addrs.
......@@ -391,9 +419,8 @@ func constructPeerHost(ctx context.Context, ctxg ctxgroup.ContextGroup, cfg *con
return peerhost, nil
}
func constructDHTRouting(ctx context.Context, ctxg ctxgroup.ContextGroup, host p2phost.Host, ds datastore.ThreadSafeDatastore) (*dht.IpfsDHT, error) {
func constructDHTRouting(ctx context.Context, host p2phost.Host, ds datastore.ThreadSafeDatastore) (*dht.IpfsDHT, error) {
dhtRouting := dht.NewDHT(ctx, host, ds)
dhtRouting.Validators[IpnsValidatorTag] = namesys.ValidateIpnsRecord
ctxg.AddChildGroup(dhtRouting)
return dhtRouting, nil
}
......@@ -4,7 +4,9 @@ import (
"testing"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/repo"
config "github.com/jbenet/go-ipfs/repo/config"
"github.com/jbenet/go-ipfs/util/testutil"
)
func TestInitialization(t *testing.T) {
......@@ -42,14 +44,22 @@ func TestInitialization(t *testing.T) {
}
for i, c := range good {
n, err := NewIPFSNode(ctx, Standard(c, false))
r := &repo.Mock{
C: *c,
D: testutil.ThreadSafeCloserMapDatastore(),
}
n, err := NewIPFSNode(ctx, Standard(r, false))
if n == nil || err != nil {
t.Error("Should have constructed.", i, err)
}
}
for i, c := range bad {
n, err := NewIPFSNode(ctx, Standard(c, false))
r := &repo.Mock{
C: *c,
D: testutil.ThreadSafeCloserMapDatastore(),
}
n, err := NewIPFSNode(ctx, Standard(r, false))
if n != nil || err == nil {
t.Error("Should have failed to construct.", i)
}
......
package core_io
// TODO rename package to something that doesn't conflict with io/ioutil.
// Pretty names are hard to find.
//
// Candidates:
//
// go-ipfs/core/unix
// go-ipfs/core/io
// go-ipfs/core/ioutil
// go-ipfs/core/coreio
// go-ipfs/core/coreunix
package coreunix
import (
"io"
......@@ -20,12 +9,14 @@ import (
u "github.com/jbenet/go-ipfs/util"
)
// Add builds a merkledag from the a reader, pinning all objects to the local
// datastore. Returns a key representing the root node.
func Add(n *core.IpfsNode, r io.Reader) (u.Key, error) {
// TODO more attractive function signature importer.BuildDagFromReader
dagNode, err := importer.BuildDagFromReader(
r,
n.DAG,
nil,
n.Pinning.GetManual(), // Fix this interface
chunk.DefaultSplitter,
)
if err != nil {
......
package core_io
// TODO rename package to something that doesn't conflict with io/ioutil.
// Pretty names are hard to find.
//
// Candidates:
//
// go-ipfs/core/unix
// go-ipfs/core/io
// go-ipfs/core/ioutil
// go-ipfs/core/coreio
// go-ipfs/core/coreunix
package coreunix
import (
"io"
......
......@@ -2,8 +2,7 @@ package core
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks/blockstore"
blockservice "github.com/jbenet/go-ipfs/blockservice"
......@@ -13,6 +12,7 @@ import (
mocknet "github.com/jbenet/go-ipfs/p2p/net/mock"
peer "github.com/jbenet/go-ipfs/p2p/peer"
path "github.com/jbenet/go-ipfs/path"
"github.com/jbenet/go-ipfs/repo"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
testutil "github.com/jbenet/go-ipfs/util/testutil"
......@@ -46,14 +46,16 @@ func NewMockNode() (*IpfsNode, error) {
}
// Temp Datastore
dstore := ds.NewMapDatastore()
nd.Datastore = ds2.CloserWrap(syncds.MutexWrap(dstore))
nd.Repo = &repo.Mock{
// TODO C: conf,
D: ds2.CloserWrap(syncds.MutexWrap(datastore.NewMapDatastore())),
}
// Routing
nd.Routing = mockrouting.NewServer().Client(ident)
// Bitswap
bstore := blockstore.NewBlockstore(nd.Datastore)
bstore := blockstore.NewBlockstore(nd.Repo.Datastore())
bserv, err := blockservice.New(bstore, offline.Exchange(bstore))
if err != nil {
return nil, err
......
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. $<
clean:
rm *.pb.go
// Code generated by protoc-gen-gogo.
// source: spipe.proto
// DO NOT EDIT!
/*
Package spipe_pb is a generated protocol buffer package.
It is generated from these files:
spipe.proto
It has these top-level messages:
Propose
Exchange
DataSig
*/
package spipe_pb
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = math.Inf
type Propose struct {
Rand []byte `protobuf:"bytes,1,opt,name=rand" json:"rand,omitempty"`
Pubkey []byte `protobuf:"bytes,2,opt,name=pubkey" json:"pubkey,omitempty"`
Exchanges *string `protobuf:"bytes,3,opt,name=exchanges" json:"exchanges,omitempty"`
Ciphers *string `protobuf:"bytes,4,opt,name=ciphers" json:"ciphers,omitempty"`
Hashes *string `protobuf:"bytes,5,opt,name=hashes" json:"hashes,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Propose) Reset() { *m = Propose{} }
func (m *Propose) String() string { return proto.CompactTextString(m) }
func (*Propose) ProtoMessage() {}
func (m *Propose) GetRand() []byte {
if m != nil {
return m.Rand
}
return nil
}
func (m *Propose) GetPubkey() []byte {
if m != nil {
return m.Pubkey
}
return nil
}
func (m *Propose) GetExchanges() string {
if m != nil && m.Exchanges != nil {
return *m.Exchanges
}
return ""
}
func (m *Propose) GetCiphers() string {
if m != nil && m.Ciphers != nil {
return *m.Ciphers
}
return ""
}
func (m *Propose) GetHashes() string {
if m != nil && m.Hashes != nil {
return *m.Hashes
}
return ""
}
type Exchange struct {
Epubkey []byte `protobuf:"bytes,1,opt,name=epubkey" json:"epubkey,omitempty"`
Signature []byte `protobuf:"bytes,2,opt,name=signature" json:"signature,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Exchange) Reset() { *m = Exchange{} }
func (m *Exchange) String() string { return proto.CompactTextString(m) }
func (*Exchange) ProtoMessage() {}
func (m *Exchange) GetEpubkey() []byte {
if m != nil {
return m.Epubkey
}
return nil
}
func (m *Exchange) GetSignature() []byte {
if m != nil {
return m.Signature
}
return nil
}
type DataSig struct {
Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
Signature []byte `protobuf:"bytes,2,opt,name=signature" json:"signature,omitempty"`
Id *uint64 `protobuf:"varint,3,opt,name=id" json:"id,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DataSig) Reset() { *m = DataSig{} }
func (m *DataSig) String() string { return proto.CompactTextString(m) }
func (*DataSig) ProtoMessage() {}
func (m *DataSig) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
func (m *DataSig) GetSignature() []byte {
if m != nil {
return m.Signature
}
return nil
}
func (m *DataSig) GetId() uint64 {
if m != nil && m.Id != nil {
return *m.Id
}
return 0
}
func init() {
}
package spipe.pb;
message Propose {
optional bytes rand = 1;
optional bytes pubkey = 2;
optional string exchanges = 3;
optional string ciphers = 4;
optional string hashes = 5;
}
message Exchange {
optional bytes epubkey = 1;
optional bytes signature = 2;
}
......@@ -116,7 +116,6 @@ func (m *Mux) HandleSync(s inet.Stream) {
}
log.Infof("muxer handle protocol: %s", name)
log.Event(ctx, "muxHandle", eventlog.Metadata{"protocol": name})
handler(s)
}
......
package repo
import (
"errors"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/jbenet/go-ipfs/repo/config"
)
var errTODO = errors.New("TODO")
// Mock is not thread-safe
type Mock struct {
C config.Config
D ds.ThreadSafeDatastore
}
func (m *Mock) Config() *config.Config {
return &m.C // FIXME threadsafety
}
func (m *Mock) SetConfig(updated *config.Config) error {
m.C = *updated // FIXME threadsafety
return nil
}
func (m *Mock) SetConfigKey(key string, value interface{}) error {
return errTODO
}
func (m *Mock) GetConfigKey(key string) (interface{}, error) {
return nil, errTODO
}
func (m *Mock) Datastore() ds.ThreadSafeDatastore { return m.D }
func (m *Mock) Close() error { return errTODO }
package repo
import (
"io"
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
config "github.com/jbenet/go-ipfs/repo/config"
util "github.com/jbenet/go-ipfs/util"
......@@ -14,6 +16,8 @@ type Repo interface {
GetConfigKey(key string) (interface{}, error)
Datastore() datastore.ThreadSafeDatastore
io.Closer
}
// IsInitialized returns true if the path is home to an initialized IPFS
......
......@@ -12,7 +12,7 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
random "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random"
"github.com/jbenet/go-ipfs/core"
core_io "github.com/jbenet/go-ipfs/core/io"
coreunix "github.com/jbenet/go-ipfs/core/coreunix"
mocknet "github.com/jbenet/go-ipfs/p2p/net/mock"
"github.com/jbenet/go-ipfs/p2p/peer"
errors "github.com/jbenet/go-ipfs/util/debugerror"
......@@ -115,12 +115,12 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error {
catter.Bootstrap(ctx, []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)})
adder.Bootstrap(ctx, []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)})
keyAdded, err := core_io.Add(adder, bytes.NewReader(data))
keyAdded, err := coreunix.Add(adder, bytes.NewReader(data))
if err != nil {
return err
}
readerCatted, err := core_io.Cat(catter, keyAdded)
readerCatted, err := coreunix.Cat(catter, keyAdded)
if err != nil {
return err
}
......
......@@ -2,19 +2,19 @@ package epictest
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
core "github.com/jbenet/go-ipfs/core"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
host "github.com/jbenet/go-ipfs/p2p/host"
peer "github.com/jbenet/go-ipfs/p2p/peer"
"github.com/jbenet/go-ipfs/repo"
dht "github.com/jbenet/go-ipfs/routing/dht"
delay "github.com/jbenet/go-ipfs/thirdparty/delay"
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
"github.com/jbenet/go-ipfs/util/datastore2"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
......@@ -25,7 +25,10 @@ func MocknetTestRepo(p peer.ID, h host.Host, conf testutil.LatencyConfig) core.C
const kWriteCacheElems = 100
const alwaysSendToPeer = true
dsDelay := delay.Fixed(conf.BlockstoreLatency)
ds := datastore2.CloserWrap(sync.MutexWrap(datastore2.WithDelay(datastore.NewMapDatastore(), dsDelay)))
r := &repo.Mock{
D: ds2.CloserWrap(syncds.MutexWrap(ds2.WithDelay(datastore.NewMapDatastore(), dsDelay))),
}
ds := r.Datastore()
log.Debugf("MocknetTestRepo: %s %s %s", p, h.ID(), h)
dhtt := dht.NewDHT(ctx, h, ds)
......@@ -39,11 +42,10 @@ func MocknetTestRepo(p peer.ID, h host.Host, conf testutil.LatencyConfig) core.C
Peerstore: h.Peerstore(),
Blockstore: bstore,
Exchange: exch,
Datastore: ds,
Repo: r,
PeerHost: h,
Routing: dhtt,
Identity: p,
DHT: dhtt,
}, nil
}
}
......@@ -8,7 +8,7 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
core "github.com/jbenet/go-ipfs/core"
core_io "github.com/jbenet/go-ipfs/core/io"
coreunix "github.com/jbenet/go-ipfs/core/coreunix"
mocknet "github.com/jbenet/go-ipfs/p2p/net/mock"
"github.com/jbenet/go-ipfs/p2p/peer"
errors "github.com/jbenet/go-ipfs/util/debugerror"
......@@ -62,12 +62,12 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error {
adder.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo})
catter.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo})
keyAdded, err := core_io.Add(adder, bytes.NewReader(data))
keyAdded, err := coreunix.Add(adder, bytes.NewReader(data))
if err != nil {
return err
}
readerCatted, err := core_io.Cat(catter, keyAdded)
readerCatted, err := coreunix.Cat(catter, keyAdded)
if err != nil {
return err
}
......
package testutil
import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
)
func ThreadSafeCloserMapDatastore() ds2.ThreadSafeDatastoreCloser {
return ds2.CloserWrap(syncds.MutexWrap(datastore.NewMapDatastore()))
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment