diff --git a/core/core.go b/core/core.go index 98afce99435631e6577bceae22b167c32d0892af..d22390d9296427bd68b32620bcd666106add65a7 100644 --- a/core/core.go +++ b/core/core.go @@ -126,7 +126,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { return nil, err } - net, err = inet.NewIpfsNetwork(context.TODO(), local, peerstore, &mux.ProtocolMap{ + net, err = inet.NewIpfsNetwork(ctx, local, peerstore, &mux.ProtocolMap{ mux.ProtocolID_Routing: dhtService, mux.ProtocolID_Exchange: exchangeService, mux.ProtocolID_Diagnostic: diagService, @@ -137,14 +137,14 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { } diagnostics = diag.NewDiagnostics(local, net, diagService) - diagService.Handler = diagnostics + diagService.SetHandler(diagnostics) route = dht.NewDHT(local, peerstore, net, dhtService, d) // TODO(brian): perform this inside NewDHT factory method - dhtService.Handler = route // wire the handler to the service. + dhtService.SetHandler(route) // wire the handler to the service. const alwaysSendToPeer = true // use YesManStrategy - exchangeSession = bitswap.NetMessageSession(ctx, local, exchangeService, route, d, alwaysSendToPeer) + exchangeSession = bitswap.NetMessageSession(ctx, local, net, exchangeService, route, d, alwaysSendToPeer) // TODO(brian): pass a context to initConnections go initConnections(ctx, cfg, peerstore, route) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 819100cfefc90751c5295110d32b5f80aa9d0e81..7eb8870aa5013f9e0db227c5ab7d234402a93d43 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -11,6 +11,7 @@ import ( bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications" strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy" + inet "github.com/jbenet/go-ipfs/net" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ) @@ -19,14 +20,17 @@ var log = u.Logger("bitswap") // NetMessageSession initializes a BitSwap session that communicates over the // provided NetMessage service -func NetMessageSession(parent context.Context, p *peer.Peer, s bsnet.NetMessageService, directory bsnet.Routing, d ds.Datastore, nice bool) exchange.Interface { +func NetMessageSession(parent context.Context, p *peer.Peer, + net inet.Network, srv inet.Service, directory bsnet.Routing, + d ds.Datastore, nice bool) exchange.Interface { - networkAdapter := bsnet.NetMessageAdapter(s, nil) + networkAdapter := bsnet.NetMessageAdapter(srv, nil) bs := &bitswap{ blockstore: blockstore.NewBlockstore(d), notifications: notifications.New(), strategy: strategy.New(nice), routing: directory, + network: net, sender: networkAdapter, wantlist: u.NewKeySet(), } @@ -38,6 +42,9 @@ func NetMessageSession(parent context.Context, p *peer.Peer, s bsnet.NetMessageS // bitswap instances implement the bitswap protocol. type bitswap struct { + // network maintains connections to the outside world. + network inet.Network + // sender delivers messages on behalf of the session sender bsnet.Adapter @@ -79,7 +86,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) } message.AppendWanted(k) for iiiii := range peersToQuery { - // log.Debug("bitswap got peersToQuery: %s", iiiii) + log.Debug("bitswap got peersToQuery: %s", iiiii) go func(p *peer.Peer) { response, err := bs.sender.SendRequest(ctx, p, message) if err != nil { diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 611dea8cbcc75627f3162b724548b7068140a0a4..8985ecefc30a67d5f2440f9258819e280ce37141 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -2,10 +2,8 @@ package network import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - netservice "github.com/jbenet/go-ipfs/net/service" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" - netmsg "github.com/jbenet/go-ipfs/net/message" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ) @@ -38,13 +36,6 @@ type Receiver interface { ReceiveError(error) } -// TODO(brian): move this to go-ipfs/net package -type NetMessageService interface { - SendRequest(ctx context.Context, m netmsg.NetMessage) (netmsg.NetMessage, error) - SendMessage(ctx context.Context, m netmsg.NetMessage) error - SetHandler(netservice.Handler) -} - // TODO rename -> Router? type Routing interface { // FindProvidersAsync returns a channel of providers for the given key diff --git a/exchange/bitswap/network/net_message_adapter.go b/exchange/bitswap/network/net_message_adapter.go index fe3bd6a36e664f3800fb764eef1993cdf886edba..a95e566ccaa87a62e38799109806e6e0599e5af9 100644 --- a/exchange/bitswap/network/net_message_adapter.go +++ b/exchange/bitswap/network/net_message_adapter.go @@ -4,12 +4,13 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" + inet "github.com/jbenet/go-ipfs/net" netmsg "github.com/jbenet/go-ipfs/net/message" peer "github.com/jbenet/go-ipfs/peer" ) // NetMessageAdapter wraps a NetMessage network service -func NetMessageAdapter(s NetMessageService, r Receiver) Adapter { +func NetMessageAdapter(s inet.Service, r Receiver) Adapter { adapter := impl{ nms: s, receiver: r, @@ -20,7 +21,7 @@ func NetMessageAdapter(s NetMessageService, r Receiver) Adapter { // implements an Adapter that integrates with a NetMessage network service type impl struct { - nms NetMessageService + nms inet.Service // inbound messages from the network are forwarded to the receiver receiver Receiver diff --git a/net/interface.go b/net/interface.go index 76b4f423726ea209a19c95eaf233032c3cbea045..dee1460fc8188889f8abf22355c4d64674bd7e20 100644 --- a/net/interface.go +++ b/net/interface.go @@ -5,8 +5,6 @@ import ( mux "github.com/jbenet/go-ipfs/net/mux" srv "github.com/jbenet/go-ipfs/net/service" peer "github.com/jbenet/go-ipfs/peer" - - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ) // Network is the interface IPFS uses for connecting to the world. @@ -39,14 +37,10 @@ type Network interface { } // Sender interface for network services. -type Sender interface { - // SendMessage sends out a given message, without expecting a response. - SendMessage(ctx context.Context, m msg.NetMessage) error - - // SendRequest sends out a given message, and awaits a response. - // Set Deadlines or cancellations in the context.Context you pass in. - SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) -} +type Sender srv.Sender // Handler interface for network services. type Handler srv.Handler + +// Service interface for network resources. +type Service srv.Service diff --git a/net/service/service.go b/net/service/service.go index de17c1d5b5747986adc673d14747b411cb0941da..c43b10ffaa79a08eb5775afa0561532e91510b8b 100644 --- a/net/service/service.go +++ b/net/service/service.go @@ -25,9 +25,35 @@ type Handler interface { HandleMessage(context.Context, msg.NetMessage) msg.NetMessage } +// Sender interface for network services. +type Sender interface { + // SendMessage sends out a given message, without expecting a response. + SendMessage(ctx context.Context, m msg.NetMessage) error + + // SendRequest sends out a given message, and awaits a response. + // Set Deadlines or cancellations in the context.Context you pass in. + SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) +} + +// Service is an interface for a net resource with both outgoing (sender) and +// incomig (SetHandler) requests. +type Service interface { + Sender + + // Start + Stop Service + Start(ctx context.Context) error + Stop() + + // GetPipe + GetPipe() *msg.Pipe + + // SetHandler assigns the request Handler for this service. + SetHandler(Handler) +} + // Service is a networking component that protocols can use to multiplex // messages over the same channel, and to issue + handle requests. -type Service struct { +type service struct { // Handler is the object registered to handle incoming requests. Handler Handler @@ -43,8 +69,8 @@ type Service struct { } // NewService creates a service object with given type ID and Handler -func NewService(h Handler) *Service { - return &Service{ +func NewService(h Handler) Service { + return &service{ Handler: h, Requests: RequestMap{}, Pipe: msg.NewPipe(10), @@ -52,7 +78,7 @@ func NewService(h Handler) *Service { } // Start kicks off the Service goroutines. -func (s *Service) Start(ctx context.Context) error { +func (s *service) Start(ctx context.Context) error { if s.cancel != nil { return errors.New("Service already started.") } @@ -65,18 +91,18 @@ func (s *Service) Start(ctx context.Context) error { } // Stop stops Service activity. -func (s *Service) Stop() { +func (s *service) Stop() { s.cancel() s.cancel = context.CancelFunc(nil) } // GetPipe implements the mux.Protocol interface -func (s *Service) GetPipe() *msg.Pipe { +func (s *service) GetPipe() *msg.Pipe { return s.Pipe } // sendMessage sends a message out (actual leg work. SendMessage is to export w/o rid) -func (s *Service) sendMessage(ctx context.Context, m msg.NetMessage, rid RequestID) error { +func (s *service) sendMessage(ctx context.Context, m msg.NetMessage, rid RequestID) error { // serialize ServiceMessage wrapper data, err := wrapData(m.Data(), rid) @@ -98,12 +124,12 @@ func (s *Service) sendMessage(ctx context.Context, m msg.NetMessage, rid Request } // SendMessage sends a message out -func (s *Service) SendMessage(ctx context.Context, m msg.NetMessage) error { +func (s *service) SendMessage(ctx context.Context, m msg.NetMessage) error { return s.sendMessage(ctx, m, nil) } // SendRequest sends a request message out and awaits a response. -func (s *Service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) { +func (s *service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) { // create a request r, err := NewRequest(m.Peer().ID) @@ -151,7 +177,7 @@ func (s *Service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMes // handleIncoming consumes the messages on the s.Incoming channel and // routes them appropriately (to requests, or handler). -func (s *Service) handleIncomingMessages(ctx context.Context) { +func (s *service) handleIncomingMessages(ctx context.Context) { for { select { case m, more := <-s.Incoming: @@ -166,7 +192,7 @@ func (s *Service) handleIncomingMessages(ctx context.Context) { } } -func (s *Service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) { +func (s *service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) { // unwrap the incoming message data, rid, err := unwrapData(m.Data()) @@ -217,6 +243,6 @@ func (s *Service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) { } // SetHandler assigns the request Handler for this service. -func (s *Service) SetHandler(h Handler) { +func (s *service) SetHandler(h Handler) { s.Handler = h }