net_message_adapter.go 2.39 KB
Newer Older
1 2 3
package network

import (
4 5
	"errors"

6
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Jeromy's avatar
Jeromy committed
7
	"github.com/jbenet/go-ipfs/util"
8 9

	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
10
	inet "github.com/jbenet/go-ipfs/net"
11 12 13 14
	netmsg "github.com/jbenet/go-ipfs/net/message"
	peer "github.com/jbenet/go-ipfs/peer"
)

Jeromy's avatar
Jeromy committed
15 16
var log = util.Logger("net_message_adapter")

17
// NetMessageAdapter wraps a NetMessage network service
18
func NetMessageAdapter(s inet.Service, n inet.Network, r Receiver) BitSwapNetwork {
19 20
	adapter := impl{
		nms:      s,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21
		net:      n,
22
		receiver: r,
23 24 25 26 27
	}
	s.SetHandler(&adapter)
	return &adapter
}

28 29
// implements an Adapter that integrates with a NetMessage network service
type impl struct {
30
	nms inet.Service
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31
	net inet.Network
32 33 34

	// inbound messages from the network are forwarded to the receiver
	receiver Receiver
35 36 37 38
}

// HandleMessage marshals and unmarshals net messages, forwarding them to the
// BitSwapMessage receiver
39
func (adapter *impl) HandleMessage(
40
	ctx context.Context, incoming netmsg.NetMessage) netmsg.NetMessage {
41 42

	if adapter.receiver == nil {
43
		return nil
44 45 46 47
	}

	received, err := bsmsg.FromNet(incoming)
	if err != nil {
48 49
		go adapter.receiver.ReceiveError(err)
		return nil
50 51
	}

52
	p, bsmsg := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received)
53 54 55

	// TODO(brian): put this in a helper function
	if bsmsg == nil || p == nil {
56
		adapter.receiver.ReceiveError(errors.New("ReceiveMessage returned nil peer or message"))
57
		return nil
58 59 60 61
	}

	outgoing, err := bsmsg.ToNet(p)
	if err != nil {
62 63
		go adapter.receiver.ReceiveError(err)
		return nil
64 65
	}

Jeromy's avatar
Jeromy committed
66
	log.Debugf("Message size: %d", len(outgoing.Data()))
67
	return outgoing
68 69
}

70
func (adapter *impl) DialPeer(ctx context.Context, p peer.Peer) error {
71
	return adapter.net.DialPeer(ctx, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72 73
}

74
func (adapter *impl) SendMessage(
75
	ctx context.Context,
76
	p peer.Peer,
77 78 79 80 81 82
	outgoing bsmsg.BitSwapMessage) error {

	nmsg, err := outgoing.ToNet(p)
	if err != nil {
		return err
	}
83
	return adapter.nms.SendMessage(ctx, nmsg)
84 85
}

86
func (adapter *impl) SendRequest(
87
	ctx context.Context,
88
	p peer.Peer,
89 90 91 92 93 94
	outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {

	outgoingMsg, err := outgoing.ToNet(p)
	if err != nil {
		return nil, err
	}
95
	incomingMsg, err := adapter.nms.SendRequest(ctx, outgoingMsg)
96 97 98 99 100 101
	if err != nil {
		return nil, err
	}
	return bsmsg.FromNet(incomingMsg)
}

102
func (adapter *impl) SetDelegate(r Receiver) {
103 104
	adapter.receiver = r
}