123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- // Copyright 2009 The Go Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- package rpc
- import (
- "bufio"
- "encoding/gob"
- "errors"
- "io"
- "log"
- "net"
- "sync"
- )
- // ServerError represents an error that has been returned from
- // the remote side of the RPC connection.
- type ServerError string
- func (e ServerError) Error() string {
- return string(e)
- }
- var ErrShutdown = errors.New("connection is shut down")
- // Call represents an active RPC.
- type Call struct {
- ServiceMethod string // The name of the service and method to call.
- Args any // The argument to the function (*struct).
- Reply any // The reply from the function (*struct).
- Error error // After completion, the error status.
- Done chan *Call // Receives *Call when Go is complete.
- }
- // Client represents an RPC Client.
- // There may be multiple outstanding Calls associated
- // with a single Client, and a Client may be used by
- // multiple goroutines simultaneously.
- type Client struct {
- codec ClientCodec
- reqMutex sync.Mutex // protects following
- request Request
- mutex sync.Mutex // protects following
- seq uint64
- pending map[uint64]*Call
- closing bool // user has called Close
- shutdown bool // server has told us to stop
- }
- // A ClientCodec implements writing of RPC requests and
- // reading of RPC responses for the client side of an RPC session.
- // The client calls WriteRequest to write a request to the connection
- // and calls ReadResponseHeader and ReadResponseBody in pairs
- // to read responses. The client calls Close when finished with the
- // connection. ReadResponseBody may be called with a nil
- // argument to force the body of the response to be read and then
- // discarded.
- // See NewClient's comment for information about concurrent access.
- type ClientCodec interface {
- WriteRequest(*Request, any) error
- ReadResponseHeader(*Response) error
- ReadResponseBody(any) error
- Close() error
- }
- func (client *Client) send(call *Call) {
- client.reqMutex.Lock()
- defer client.reqMutex.Unlock()
- // Register this call.
- client.mutex.Lock()
- if client.shutdown || client.closing {
- client.mutex.Unlock()
- call.Error = ErrShutdown
- call.done()
- return
- }
- seq := client.seq
- client.seq++
- client.pending[seq] = call
- client.mutex.Unlock()
- // Encode and send the request.
- client.request.Seq = seq
- client.request.ServiceMethod = call.ServiceMethod
- err := client.codec.WriteRequest(&client.request, call.Args)
- if err != nil {
- client.mutex.Lock()
- call = client.pending[seq]
- delete(client.pending, seq)
- client.mutex.Unlock()
- if call != nil {
- call.Error = err
- call.done()
- }
- }
- }
- func (client *Client) input() {
- var err error
- var response Response
- for err == nil {
- response = Response{}
- err = client.codec.ReadResponseHeader(&response)
- if err != nil {
- break
- }
- seq := response.Seq
- client.mutex.Lock()
- call := client.pending[seq]
- delete(client.pending, seq)
- client.mutex.Unlock()
- switch {
- case call == nil:
- // We've got no pending call. That usually means that
- // WriteRequest partially failed, and call was already
- // removed; response is a server telling us about an
- // error reading request body. We should still attempt
- // to read error body, but there's no one to give it to.
- err = client.codec.ReadResponseBody(nil)
- if err != nil {
- err = errors.New("reading error body: " + err.Error())
- }
- case response.Error != "":
- // We've got an error response. Give this to the request;
- // any subsequent requests will get the ReadResponseBody
- // error if there is one.
- call.Error = ServerError(response.Error)
- err = client.codec.ReadResponseBody(nil)
- if err != nil {
- err = errors.New("reading error body: " + err.Error())
- }
- call.done()
- default:
- err = client.codec.ReadResponseBody(call.Reply)
- if err != nil {
- call.Error = errors.New("reading body " + err.Error())
- }
- call.done()
- }
- }
- // Terminate pending calls.
- client.reqMutex.Lock()
- client.mutex.Lock()
- client.shutdown = true
- closing := client.closing
- if err == io.EOF {
- if closing {
- err = ErrShutdown
- } else {
- err = io.ErrUnexpectedEOF
- }
- }
- for _, call := range client.pending {
- call.Error = err
- call.done()
- }
- client.mutex.Unlock()
- client.reqMutex.Unlock()
- if debugLog && err != io.EOF && !closing {
- log.Println("rpc: client protocol error:", err)
- }
- }
- func (call *Call) done() {
- select {
- case call.Done <- call:
- // ok
- default:
- // We don't want to block here. It is the caller's responsibility to make
- // sure the channel has enough buffer space. See comment in Go().
- if debugLog {
- log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
- }
- }
- }
- // NewClient returns a new Client to handle requests to the
- // set of services at the other end of the connection.
- // It adds a buffer to the write side of the connection so
- // the header and payload are sent as a unit.
- //
- // The read and write halves of the connection are serialized independently,
- // so no interlocking is required. However each half may be accessed
- // concurrently so the implementation of conn should protect against
- // concurrent reads or concurrent writes.
- func NewClient(conn io.ReadWriteCloser) *Client {
- encBuf := bufio.NewWriter(conn)
- client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
- return NewClientWithCodec(client)
- }
- // NewClientWithCodec is like NewClient but uses the specified
- // codec to encode requests and decode responses.
- func NewClientWithCodec(codec ClientCodec) *Client {
- client := &Client{
- codec: codec,
- pending: make(map[uint64]*Call),
- }
- go client.input()
- return client
- }
- type gobClientCodec struct {
- rwc io.ReadWriteCloser
- dec *gob.Decoder
- enc *gob.Encoder
- encBuf *bufio.Writer
- }
- func (c *gobClientCodec) WriteRequest(r *Request, body any) (err error) {
- if err = c.enc.Encode(r); err != nil {
- return
- }
- if err = c.enc.Encode(body); err != nil {
- return
- }
- return c.encBuf.Flush()
- }
- func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
- return c.dec.Decode(r)
- }
- func (c *gobClientCodec) ReadResponseBody(body any) error {
- return c.dec.Decode(body)
- }
- func (c *gobClientCodec) Close() error {
- return c.rwc.Close()
- }
- // Dial connects to an RPC server at the specified network address.
- func Dial(network, address string) (*Client, error) {
- conn, err := net.Dial(network, address)
- if err != nil {
- return nil, err
- }
- return NewClient(conn), nil
- }
- // Close calls the underlying codec's Close method. If the connection is already
- // shutting down, ErrShutdown is returned.
- func (client *Client) Close() error {
- client.mutex.Lock()
- if client.closing {
- client.mutex.Unlock()
- return ErrShutdown
- }
- client.closing = true
- client.mutex.Unlock()
- return client.codec.Close()
- }
- // Go invokes the function asynchronously. It returns the Call structure representing
- // the invocation. The done channel will signal when the call is complete by returning
- // the same Call object. If done is nil, Go will allocate a new channel.
- // If non-nil, done must be buffered or Go will deliberately crash.
- func (client *Client) Go(serviceMethod string, args any, reply any, done chan *Call) *Call {
- call := new(Call)
- call.ServiceMethod = serviceMethod
- call.Args = args
- call.Reply = reply
- if done == nil {
- done = make(chan *Call, 10) // buffered.
- } else {
- // If caller passes done != nil, it must arrange that
- // done has enough buffer for the number of simultaneous
- // RPCs that will be using that channel. If the channel
- // is totally unbuffered, it's best not to run at all.
- if cap(done) == 0 {
- log.Panic("rpc: done channel is unbuffered")
- }
- }
- call.Done = done
- client.send(call)
- return call
- }
- // Call invokes the named function, waits for it to complete, and returns its error status.
- func (client *Client) Call(serviceMethod string, args any, reply any) error {
- call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
- return call.Error
- }
|