Commit 338b0372 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

clean up and add inet.Network to bitswap

new Service interface
parent da95db3b
......@@ -126,7 +126,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
return nil, err
}
net, err = inet.NewIpfsNetwork(context.TODO(), local, peerstore, &mux.ProtocolMap{
net, err = inet.NewIpfsNetwork(ctx, local, peerstore, &mux.ProtocolMap{
mux.ProtocolID_Routing: dhtService,
mux.ProtocolID_Exchange: exchangeService,
mux.ProtocolID_Diagnostic: diagService,
......@@ -137,14 +137,14 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
}
diagnostics = diag.NewDiagnostics(local, net, diagService)
diagService.Handler = diagnostics
diagService.SetHandler(diagnostics)
route = dht.NewDHT(local, peerstore, net, dhtService, d)
// TODO(brian): perform this inside NewDHT factory method
dhtService.Handler = route // wire the handler to the service.
dhtService.SetHandler(route) // wire the handler to the service.
const alwaysSendToPeer = true // use YesManStrategy
exchangeSession = bitswap.NetMessageSession(ctx, local, exchangeService, route, d, alwaysSendToPeer)
exchangeSession = bitswap.NetMessageSession(ctx, local, net, exchangeService, route, d, alwaysSendToPeer)
// TODO(brian): pass a context to initConnections
go initConnections(ctx, cfg, peerstore, route)
......
......@@ -11,6 +11,7 @@ import (
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
......@@ -19,14 +20,17 @@ var log = u.Logger("bitswap")
// NetMessageSession initializes a BitSwap session that communicates over the
// provided NetMessage service
func NetMessageSession(parent context.Context, p *peer.Peer, s bsnet.NetMessageService, directory bsnet.Routing, d ds.Datastore, nice bool) exchange.Interface {
func NetMessageSession(parent context.Context, p *peer.Peer,
net inet.Network, srv inet.Service, directory bsnet.Routing,
d ds.Datastore, nice bool) exchange.Interface {
networkAdapter := bsnet.NetMessageAdapter(s, nil)
networkAdapter := bsnet.NetMessageAdapter(srv, nil)
bs := &bitswap{
blockstore: blockstore.NewBlockstore(d),
notifications: notifications.New(),
strategy: strategy.New(nice),
routing: directory,
network: net,
sender: networkAdapter,
wantlist: u.NewKeySet(),
}
......@@ -38,6 +42,9 @@ func NetMessageSession(parent context.Context, p *peer.Peer, s bsnet.NetMessageS
// bitswap instances implement the bitswap protocol.
type bitswap struct {
// network maintains connections to the outside world.
network inet.Network
// sender delivers messages on behalf of the session
sender bsnet.Adapter
......@@ -79,7 +86,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
}
message.AppendWanted(k)
for iiiii := range peersToQuery {
// log.Debug("bitswap got peersToQuery: %s", iiiii)
log.Debug("bitswap got peersToQuery: %s", iiiii)
go func(p *peer.Peer) {
response, err := bs.sender.SendRequest(ctx, p, message)
if err != nil {
......
......@@ -2,10 +2,8 @@ package network
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
netservice "github.com/jbenet/go-ipfs/net/service"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
netmsg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
......@@ -38,13 +36,6 @@ type Receiver interface {
ReceiveError(error)
}
// TODO(brian): move this to go-ipfs/net package
type NetMessageService interface {
SendRequest(ctx context.Context, m netmsg.NetMessage) (netmsg.NetMessage, error)
SendMessage(ctx context.Context, m netmsg.NetMessage) error
SetHandler(netservice.Handler)
}
// TODO rename -> Router?
type Routing interface {
// FindProvidersAsync returns a channel of providers for the given key
......
......@@ -4,12 +4,13 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
inet "github.com/jbenet/go-ipfs/net"
netmsg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
)
// NetMessageAdapter wraps a NetMessage network service
func NetMessageAdapter(s NetMessageService, r Receiver) Adapter {
func NetMessageAdapter(s inet.Service, r Receiver) Adapter {
adapter := impl{
nms: s,
receiver: r,
......@@ -20,7 +21,7 @@ func NetMessageAdapter(s NetMessageService, r Receiver) Adapter {
// implements an Adapter that integrates with a NetMessage network service
type impl struct {
nms NetMessageService
nms inet.Service
// inbound messages from the network are forwarded to the receiver
receiver Receiver
......
......@@ -5,8 +5,6 @@ import (
mux "github.com/jbenet/go-ipfs/net/mux"
srv "github.com/jbenet/go-ipfs/net/service"
peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
// Network is the interface IPFS uses for connecting to the world.
......@@ -39,14 +37,10 @@ type Network interface {
}
// Sender interface for network services.
type Sender interface {
// SendMessage sends out a given message, without expecting a response.
SendMessage(ctx context.Context, m msg.NetMessage) error
// SendRequest sends out a given message, and awaits a response.
// Set Deadlines or cancellations in the context.Context you pass in.
SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error)
}
type Sender srv.Sender
// Handler interface for network services.
type Handler srv.Handler
// Service interface for network resources.
type Service srv.Service
......@@ -25,9 +25,35 @@ type Handler interface {
HandleMessage(context.Context, msg.NetMessage) msg.NetMessage
}
// Sender interface for network services.
type Sender interface {
// SendMessage sends out a given message, without expecting a response.
SendMessage(ctx context.Context, m msg.NetMessage) error
// SendRequest sends out a given message, and awaits a response.
// Set Deadlines or cancellations in the context.Context you pass in.
SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error)
}
// Service is an interface for a net resource with both outgoing (sender) and
// incomig (SetHandler) requests.
type Service interface {
Sender
// Start + Stop Service
Start(ctx context.Context) error
Stop()
// GetPipe
GetPipe() *msg.Pipe
// SetHandler assigns the request Handler for this service.
SetHandler(Handler)
}
// Service is a networking component that protocols can use to multiplex
// messages over the same channel, and to issue + handle requests.
type Service struct {
type service struct {
// Handler is the object registered to handle incoming requests.
Handler Handler
......@@ -43,8 +69,8 @@ type Service struct {
}
// NewService creates a service object with given type ID and Handler
func NewService(h Handler) *Service {
return &Service{
func NewService(h Handler) Service {
return &service{
Handler: h,
Requests: RequestMap{},
Pipe: msg.NewPipe(10),
......@@ -52,7 +78,7 @@ func NewService(h Handler) *Service {
}
// Start kicks off the Service goroutines.
func (s *Service) Start(ctx context.Context) error {
func (s *service) Start(ctx context.Context) error {
if s.cancel != nil {
return errors.New("Service already started.")
}
......@@ -65,18 +91,18 @@ func (s *Service) Start(ctx context.Context) error {
}
// Stop stops Service activity.
func (s *Service) Stop() {
func (s *service) Stop() {
s.cancel()
s.cancel = context.CancelFunc(nil)
}
// GetPipe implements the mux.Protocol interface
func (s *Service) GetPipe() *msg.Pipe {
func (s *service) GetPipe() *msg.Pipe {
return s.Pipe
}
// sendMessage sends a message out (actual leg work. SendMessage is to export w/o rid)
func (s *Service) sendMessage(ctx context.Context, m msg.NetMessage, rid RequestID) error {
func (s *service) sendMessage(ctx context.Context, m msg.NetMessage, rid RequestID) error {
// serialize ServiceMessage wrapper
data, err := wrapData(m.Data(), rid)
......@@ -98,12 +124,12 @@ func (s *Service) sendMessage(ctx context.Context, m msg.NetMessage, rid Request
}
// SendMessage sends a message out
func (s *Service) SendMessage(ctx context.Context, m msg.NetMessage) error {
func (s *service) SendMessage(ctx context.Context, m msg.NetMessage) error {
return s.sendMessage(ctx, m, nil)
}
// SendRequest sends a request message out and awaits a response.
func (s *Service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {
func (s *service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {
// create a request
r, err := NewRequest(m.Peer().ID)
......@@ -151,7 +177,7 @@ func (s *Service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMes
// handleIncoming consumes the messages on the s.Incoming channel and
// routes them appropriately (to requests, or handler).
func (s *Service) handleIncomingMessages(ctx context.Context) {
func (s *service) handleIncomingMessages(ctx context.Context) {
for {
select {
case m, more := <-s.Incoming:
......@@ -166,7 +192,7 @@ func (s *Service) handleIncomingMessages(ctx context.Context) {
}
}
func (s *Service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) {
func (s *service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) {
// unwrap the incoming message
data, rid, err := unwrapData(m.Data())
......@@ -217,6 +243,6 @@ func (s *Service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) {
}
// SetHandler assigns the request Handler for this service.
func (s *Service) SetHandler(h Handler) {
func (s *service) SetHandler(h Handler) {
s.Handler = h
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment