mux.go 4.38 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4
package mux

import (
	"errors"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5
	"sync"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6 7 8 9

	msg "github.com/jbenet/go-ipfs/net/message"
	u "github.com/jbenet/go-ipfs/util"

10 11
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
)

// Protocol objects produce + consume raw data. They are added to the Muxer
// with a ProtocolID, which is added to outgoing payloads. Muxer properly
// encapsulates and decapsulates when interfacing with its Protocols. The
// Protocols do not encounter their ProtocolID.
type Protocol interface {
	GetPipe() *msg.Pipe
}

// ProtocolMap maps ProtocolIDs to Protocols.
type ProtocolMap map[ProtocolID]Protocol

// Muxer is a simple multiplexor that reads + writes to Incoming and Outgoing
// channels. It multiplexes various protocols, wrapping and unwrapping data
// with a ProtocolID.
type Muxer struct {
	// Protocols are the multiplexed services.
	Protocols ProtocolMap

	// cancel is the function to stop the Muxer
	cancel context.CancelFunc
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34 35
	ctx    context.Context
	wg     sync.WaitGroup
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36 37 38 39

	*msg.Pipe
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40 41 42 43 44 45 46 47
// NewMuxer constructs a muxer given a protocol map.
func NewMuxer(mp ProtocolMap) *Muxer {
	return &Muxer{
		Protocols: mp,
		Pipe:      msg.NewPipe(10),
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
48 49 50 51 52 53 54
// GetPipe implements the Protocol interface
func (m *Muxer) GetPipe() *msg.Pipe {
	return m.Pipe
}

// Start kicks off the Muxer goroutines.
func (m *Muxer) Start(ctx context.Context) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55 56 57 58
	if m == nil {
		panic("nix muxer")
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59 60 61 62 63
	if m.cancel != nil {
		return errors.New("Muxer already started.")
	}

	// make a cancellable context.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64 65
	m.ctx, m.cancel = context.WithCancel(ctx)
	m.wg = sync.WaitGroup{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67 68
	m.wg.Add(1)
	go m.handleIncomingMessages()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69
	for pid, proto := range m.Protocols {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70 71
		m.wg.Add(1)
		go m.handleOutgoingMessages(pid, proto)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72 73 74 75 76 77 78
	}

	return nil
}

// Stop stops muxer activity.
func (m *Muxer) Stop() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79 80 81 82
	if m.cancel == nil {
		panic("muxer stopped twice.")
	}
	// issue cancel, and wipe func.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83 84
	m.cancel()
	m.cancel = context.CancelFunc(nil)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
85 86 87

	// wait for everything to wind down.
	m.wg.Wait()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88 89 90 91 92 93 94 95 96 97 98 99 100 101
}

// AddProtocol adds a Protocol with given ProtocolID to the Muxer.
func (m *Muxer) AddProtocol(p Protocol, pid ProtocolID) error {
	if _, found := m.Protocols[pid]; found {
		return errors.New("Another protocol already using this ProtocolID")
	}

	m.Protocols[pid] = p
	return nil
}

// handleIncoming consumes the messages on the m.Incoming channel and
// routes them appropriately (to the protocols).
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102 103
func (m *Muxer) handleIncomingMessages() {
	defer m.wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105
	for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
106
		if m == nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107
			panic("nil muxer")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
108 109
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
110
		select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
111 112 113 114
		case msg, more := <-m.Incoming:
			if !more {
				return
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
115
			go m.handleIncomingMessage(msg)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117
		case <-m.ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118 119 120 121 122 123
			return
		}
	}
}

// handleIncomingMessage routes message to the appropriate protocol.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
124
func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
125

126
	data, pid, err := unwrapData(m1.Data())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127 128 129 130 131
	if err != nil {
		u.PErr("muxer de-serializing error: %v\n", err)
		return
	}

132
	m2 := msg.New(m1.Peer(), data)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
133 134 135 136 137 138 139 140
	proto, found := m.Protocols[pid]
	if !found {
		u.PErr("muxer unknown protocol %v\n", pid)
		return
	}

	select {
	case proto.GetPipe().Incoming <- m2:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141 142
	case <-m.ctx.Done():
		u.PErr("%v\n", m.ctx.Err())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
143 144 145 146 147 148
		return
	}
}

// handleOutgoingMessages consumes the messages on the proto.Outgoing channel,
// wraps them and sends them out.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
149 150 151
func (m *Muxer) handleOutgoingMessages(pid ProtocolID, proto Protocol) {
	defer m.wg.Done()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152 153
	for {
		select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154 155 156 157
		case msg, more := <-proto.GetPipe().Outgoing:
			if !more {
				return
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
			go m.handleOutgoingMessage(pid, msg)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160
		case <-m.ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161 162 163 164 165 166
			return
		}
	}
}

// handleOutgoingMessage wraps out a message and sends it out the
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167
func (m *Muxer) handleOutgoingMessage(pid ProtocolID, m1 msg.NetMessage) {
168
	data, err := wrapData(m1.Data(), pid)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169 170 171 172 173
	if err != nil {
		u.PErr("muxer serializing error: %v\n", err)
		return
	}

174
	m2 := msg.New(m1.Peer(), data)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
175 176
	select {
	case m.GetPipe().Outgoing <- m2:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
177
	case <-m.ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
		return
	}
}

func wrapData(data []byte, pid ProtocolID) ([]byte, error) {
	// Marshal
	pbm := new(PBProtocolMessage)
	pbm.ProtocolID = &pid
	pbm.Data = data
	b, err := proto.Marshal(pbm)
	if err != nil {
		return nil, err
	}

	return b, nil
}

func unwrapData(data []byte) ([]byte, ProtocolID, error) {
	// Unmarshal
	pbm := new(PBProtocolMessage)
	err := proto.Unmarshal(data, pbm)
	if err != nil {
		return nil, 0, err
	}

	return pbm.GetData(), pbm.GetProtocolID(), nil
}