service.go 6.08 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4
package service

import (
	"errors"
5
	"fmt"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6 7 8 9
	"sync"

	msg "github.com/jbenet/go-ipfs/net/message"
	u "github.com/jbenet/go-ipfs/util"
10
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11

12
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13 14
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
var log = u.Logger("service")

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18 19 20
// ErrNoResponse is returned by Service when a Request did not get a response,
// and no other error happened
var ErrNoResponse = errors.New("no response to request")

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21 22 23 24 25 26
// Handler is an interface that objects must implement in order to handle
// a service's requests.
type Handler interface {

	// HandleMessage receives an incoming message, and potentially returns
	// a response message to send back.
27
	HandleMessage(context.Context, msg.NetMessage) msg.NetMessage
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28 29
}

30 31 32 33 34 35 36 37 38 39 40 41 42 43
// Sender interface for network services.
type Sender interface {
	// SendMessage sends out a given message, without expecting a response.
	SendMessage(ctx context.Context, m msg.NetMessage) error

	// SendRequest sends out a given message, and awaits a response.
	// Set Deadlines or cancellations in the context.Context you pass in.
	SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error)
}

// Service is an interface for a net resource with both outgoing (sender) and
// incomig (SetHandler) requests.
type Service interface {
	Sender
44
	ctxc.ContextCloser
45 46 47 48 49 50

	// GetPipe
	GetPipe() *msg.Pipe

	// SetHandler assigns the request Handler for this service.
	SetHandler(Handler)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
51
	GetHandler() Handler
52 53
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
54 55
// Service is a networking component that protocols can use to multiplex
// messages over the same channel, and to issue + handle requests.
56
type service struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57
	// Handler is the object registered to handle incoming requests.
58 59
	Handler     Handler
	HandlerLock sync.RWMutex
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60 61 62 63 64 65 66

	// Requests are all the pending requests on this service.
	Requests     RequestMap
	RequestsLock sync.RWMutex

	// Message Pipe (connected to the outside world)
	*msg.Pipe
67
	ctxc.ContextCloser
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68 69 70
}

// NewService creates a service object with given type ID and Handler
71 72 73 74 75 76
func NewService(ctx context.Context, h Handler) Service {
	s := &service{
		Handler:       h,
		Requests:      RequestMap{},
		Pipe:          msg.NewPipe(10),
		ContextCloser: ctxc.NewContextCloser(ctx, nil),
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77 78
	}

79 80 81
	s.Children().Add(1)
	go s.handleIncomingMessages()
	return s
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82 83
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
84
// GetPipe implements the mux.Protocol interface
85
func (s *service) GetPipe() *msg.Pipe {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86 87 88
	return s.Pipe
}

89
// sendMessage sends a message out (actual leg work. SendMessage is to export w/o rid)
90
func (s *service) sendMessage(ctx context.Context, m msg.NetMessage, rid RequestID) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
91 92

	// serialize ServiceMessage wrapper
93
	data, err := wrapData(m.Data(), rid)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
94 95 96 97
	if err != nil {
		return err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98
	// log.Debug("Service send message [to = %s]", m.Peer())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
100
	// send message
101
	m2 := msg.New(m.Peer(), data)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102 103 104 105 106 107 108 109 110
	select {
	case s.Outgoing <- m2:
	case <-ctx.Done():
		return ctx.Err()
	}

	return nil
}

111
// SendMessage sends a message out
112
func (s *service) SendMessage(ctx context.Context, m msg.NetMessage) error {
113 114 115
	return s.sendMessage(ctx, m, nil)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116
// SendRequest sends a request message out and awaits a response.
117
func (s *service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118

119 120 121 122 123 124 125 126 127
	// check if we should bail given our contexts
	select {
	default:
	case <-s.Closing():
		return nil, fmt.Errorf("service closed: %s", s.Context().Err())
	case <-ctx.Done():
		return nil, ctx.Err()
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
128
	// create a request
129
	r, err := NewRequest(m.Peer().ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
	if err != nil {
		return nil, err
	}

	// register Request
	s.RequestsLock.Lock()
	s.Requests[r.Key()] = r
	s.RequestsLock.Unlock()

	// defer deleting this request
	defer func() {
		s.RequestsLock.Lock()
		delete(s.Requests, r.Key())
		s.RequestsLock.Unlock()
	}()

	// check if we should bail after waiting for mutex
	select {
	default:
149 150
	case <-s.Closing():
		return nil, fmt.Errorf("service closed: %s", s.Context().Err())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151 152 153 154 155
	case <-ctx.Done():
		return nil, ctx.Err()
	}

	// Send message
156
	s.sendMessage(ctx, m, r.ID)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157 158 159 160 161 162

	// wait for response
	m = nil
	err = nil
	select {
	case m = <-r.Response:
163 164
	case <-s.Closed():
		err = fmt.Errorf("service closed: %s", s.Context().Err())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165 166 167 168
	case <-ctx.Done():
		err = ctx.Err()
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169 170 171 172
	if m == nil {
		return nil, ErrNoResponse
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
173 174 175 176 177
	return m, err
}

// handleIncoming consumes the messages on the s.Incoming channel and
// routes them appropriately (to requests, or handler).
178 179 180
func (s *service) handleIncomingMessages() {
	defer s.Children().Done()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181 182
	for {
		select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
183 184 185 186
		case m, more := <-s.Incoming:
			if !more {
				return
			}
187 188
			s.Children().Add(1)
			go s.handleIncomingMessage(m)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
189

190
		case <-s.Closing():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191 192 193 194 195
			return
		}
	}
}

196 197
func (s *service) handleIncomingMessage(m msg.NetMessage) {
	defer s.Children().Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198 199

	// unwrap the incoming message
200
	data, rid, err := unwrapData(m.Data())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
201
	if err != nil {
202 203
		log.Errorf("service de-serializing error: %v", err)
		return
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
204
	}
205

206
	m2 := msg.New(m.Peer(), data)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207 208 209

	// if it's a request (or has no RequestID), handle it
	if rid == nil || rid.IsRequest() {
210 211
		handler := s.GetHandler()
		if handler == nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212
			log.Errorf("service dropped msg: %v", m)
213 214 215
			return // no handler, drop it.
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
216
		// should this be "go HandleMessage ... ?"
217
		r1 := handler.HandleMessage(s.Context(), m2)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
218 219 220

		// if handler gave us a response, send it back out!
		if r1 != nil {
221
			err := s.sendMessage(s.Context(), r1, rid.Response())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
222
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
223
				log.Errorf("error sending response message: %v", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
224 225 226 227 228 229 230
			}
		}
		return
	}

	// Otherwise, it is a response. handle it.
	if !rid.IsResponse() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
231
		log.Errorf("RequestID should identify a response here.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
232 233
	}

234
	key := RequestKey(m.Peer().ID(), RequestID(rid))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
235 236 237 238 239
	s.RequestsLock.RLock()
	r, found := s.Requests[key]
	s.RequestsLock.RUnlock()

	if !found {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
240
		log.Errorf("no request key %v (timeout?)", []byte(key))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
241 242 243 244 245
		return
	}

	select {
	case r.Response <- m2:
246
	case <-s.Closing():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
247 248
	}
}
249

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
250
// SetHandler assigns the request Handler for this service.
251
func (s *service) SetHandler(h Handler) {
252 253
	s.HandlerLock.Lock()
	defer s.HandlerLock.Unlock()
254 255
	s.Handler = h
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
256 257 258

// GetHandler returns the request Handler for this service.
func (s *service) GetHandler() Handler {
259 260
	s.HandlerLock.RLock()
	defer s.HandlerLock.RUnlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
261 262
	return s.Handler
}