service.go 4.91 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4 5 6 7 8 9
package service

import (
	"errors"
	"sync"

	msg "github.com/jbenet/go-ipfs/net/message"
	u "github.com/jbenet/go-ipfs/util"

10
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11 12
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13 14
var log = u.Logger("service")

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16 17 18
// ErrNoResponse is returned by Service when a Request did not get a response,
// and no other error happened
var ErrNoResponse = errors.New("no response to request")

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20 21 22 23 24
// Handler is an interface that objects must implement in order to handle
// a service's requests.
type Handler interface {

	// HandleMessage receives an incoming message, and potentially returns
	// a response message to send back.
25
	HandleMessage(context.Context, msg.NetMessage) msg.NetMessage
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
}

// Service is a networking component that protocols can use to multiplex
// messages over the same channel, and to issue + handle requests.
type Service struct {
	// Handler is the object registered to handle incoming requests.
	Handler Handler

	// Requests are all the pending requests on this service.
	Requests     RequestMap
	RequestsLock sync.RWMutex

	// cancel is the function to stop the Service
	cancel context.CancelFunc

	// Message Pipe (connected to the outside world)
	*msg.Pipe
}

// NewService creates a service object with given type ID and Handler
46 47
func NewService(h Handler) *Service {
	return &Service{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
		Handler:  h,
		Requests: RequestMap{},
		Pipe:     msg.NewPipe(10),
	}
}

// Start kicks off the Service goroutines.
func (s *Service) Start(ctx context.Context) error {
	if s.cancel != nil {
		return errors.New("Service already started.")
	}

	// make a cancellable context.
	ctx, s.cancel = context.WithCancel(ctx)

	go s.handleIncomingMessages(ctx)
	return nil
}

// Stop stops Service activity.
func (s *Service) Stop() {
	s.cancel()
	s.cancel = context.CancelFunc(nil)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73 74 75 76 77
// GetPipe implements the mux.Protocol interface
func (s *Service) GetPipe() *msg.Pipe {
	return s.Pipe
}

78 79
// sendMessage sends a message out (actual leg work. SendMessage is to export w/o rid)
func (s *Service) sendMessage(ctx context.Context, m msg.NetMessage, rid RequestID) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80 81

	// serialize ServiceMessage wrapper
82
	data, err := wrapData(m.Data(), rid)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83 84 85 86
	if err != nil {
		return err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87
	// log.Debug("Service send message [to = %s]", m.Peer())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
89
	// send message
90
	m2 := msg.New(m.Peer(), data)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
91 92 93 94 95 96 97 98 99
	select {
	case s.Outgoing <- m2:
	case <-ctx.Done():
		return ctx.Err()
	}

	return nil
}

100 101 102 103 104
// SendMessage sends a message out
func (s *Service) SendMessage(ctx context.Context, m msg.NetMessage) error {
	return s.sendMessage(ctx, m, nil)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105
// SendRequest sends a request message out and awaits a response.
106
func (s *Service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107 108

	// create a request
109
	r, err := NewRequest(m.Peer().ID)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
	if err != nil {
		return nil, err
	}

	// register Request
	s.RequestsLock.Lock()
	s.Requests[r.Key()] = r
	s.RequestsLock.Unlock()

	// defer deleting this request
	defer func() {
		s.RequestsLock.Lock()
		delete(s.Requests, r.Key())
		s.RequestsLock.Unlock()
	}()

	// check if we should bail after waiting for mutex
	select {
	default:
	case <-ctx.Done():
		return nil, ctx.Err()
	}

	// Send message
134
	s.sendMessage(ctx, m, r.ID)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135 136 137 138 139 140 141 142 143 144

	// wait for response
	m = nil
	err = nil
	select {
	case m = <-r.Response:
	case <-ctx.Done():
		err = ctx.Err()
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
145 146 147 148
	if m == nil {
		return nil, ErrNoResponse
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
149 150 151 152 153 154 155 156
	return m, err
}

// handleIncoming consumes the messages on the s.Incoming channel and
// routes them appropriately (to requests, or handler).
func (s *Service) handleIncomingMessages(ctx context.Context) {
	for {
		select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157 158 159 160
		case m, more := <-s.Incoming:
			if !more {
				return
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161 162 163 164 165 166 167 168
			go s.handleIncomingMessage(ctx, m)

		case <-ctx.Done():
			return
		}
	}
}

169
func (s *Service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170 171

	// unwrap the incoming message
172
	data, rid, err := unwrapData(m.Data())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
173
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174
		log.Error("de-serializing error: %v", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
175
	}
176
	m2 := msg.New(m.Peer(), data)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
177 178 179

	// if it's a request (or has no RequestID), handle it
	if rid == nil || rid.IsRequest() {
180
		if s.Handler == nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181
			log.Error("service dropped msg: %v", m)
182 183 184
			return // no handler, drop it.
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185
		// should this be "go HandleMessage ... ?"
186
		r1 := s.Handler.HandleMessage(ctx, m2)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187 188 189

		// if handler gave us a response, send it back out!
		if r1 != nil {
190
			err := s.sendMessage(ctx, r1, rid.Response())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192
				log.Error("error sending response message: %v", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
193 194 195 196 197 198 199
			}
		}
		return
	}

	// Otherwise, it is a response. handle it.
	if !rid.IsResponse() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
200
		log.Error("RequestID should identify a response here.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
201 202
	}

203
	key := RequestKey(m.Peer().ID, RequestID(rid))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
204 205 206 207 208
	s.RequestsLock.RLock()
	r, found := s.Requests[key]
	s.RequestsLock.RUnlock()

	if !found {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209
		log.Error("no request key %v (timeout?)", []byte(key))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
210 211 212 213 214 215 216 217
		return
	}

	select {
	case r.Response <- m2:
	case <-ctx.Done():
	}
}
218

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
219
// SetHandler assigns the request Handler for this service.
220 221 222
func (s *Service) SetHandler(h Handler) {
	s.Handler = h
}