123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682 |
- // Copyright 2009 The Go Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- /*
- Package rpc is a trimmed down version of net/rpc in the standard library.
- Original doc:
- Package rpc provides access to the exported methods of an object across a
- network or other I/O connection. A server registers an object, making it visible
- as a service with the name of the type of the object. After registration, exported
- methods of the object will be accessible remotely. A server may register multiple
- objects (services) of different types but it is an error to register multiple
- objects of the same type.
- Only methods that satisfy these criteria will be made available for remote access;
- other methods will be ignored:
- - the method's type is exported.
- - the method is exported.
- - the method has two arguments, both exported (or builtin) types.
- - the method's second argument is a pointer.
- - the method has return type error.
- In effect, the method must look schematically like
- func (t *T) MethodName(argType T1, replyType *T2) error
- where T1 and T2 can be marshaled by encoding/gob.
- These requirements apply even if a different codec is used.
- (In the future, these requirements may soften for custom codecs.)
- The method's first argument represents the arguments provided by the caller; the
- second argument represents the result parameters to be returned to the caller.
- The method's return value, if non-nil, is passed back as a string that the client
- sees as if created by errors.New. If an error is returned, the reply parameter
- will not be sent back to the client.
- The server may handle requests on a single connection by calling ServeConn. More
- typically it will create a network listener and call Accept or, for an HTTP
- listener, HandleHTTP and http.Serve.
- A client wishing to use the service establishes a connection and then invokes
- NewClient on the connection. The convenience function Dial (DialHTTP) performs
- both steps for a raw network connection (an HTTP connection). The resulting
- Client object has two methods, Call and Go, that specify the service and method to
- call, a pointer containing the arguments, and a pointer to receive the result
- parameters.
- The Call method waits for the remote call to complete while the Go method
- launches the call asynchronously and signals completion using the Call
- structure's Done channel.
- Unless an explicit codec is set up, package encoding/gob is used to
- transport the data.
- Here is a simple example. A server wishes to export an object of type Arith:
- package server
- import "errors"
- type Args struct {
- A, B int
- }
- type Quotient struct {
- Quo, Rem int
- }
- type Arith int
- func (t *Arith) Multiply(args *Args, reply *int) error {
- *reply = args.A * args.B
- return nil
- }
- func (t *Arith) Divide(args *Args, quo *Quotient) error {
- if args.B == 0 {
- return errors.New("divide by zero")
- }
- quo.Quo = args.A / args.B
- quo.Rem = args.A % args.B
- return nil
- }
- The server calls (for HTTP service):
- arith := new(Arith)
- rpc.Register(arith)
- rpc.HandleHTTP()
- l, e := net.Listen("tcp", ":1234")
- if e != nil {
- log.Fatal("listen error:", e)
- }
- go http.Serve(l, nil)
- At this point, clients can see a service "Arith" with methods "Arith.Multiply" and
- "Arith.Divide". To invoke one, a client first dials the server:
- client, err := rpc.DialHTTP("tcp", serverAddress + ":1234")
- if err != nil {
- log.Fatal("dialing:", err)
- }
- Then it can make a remote call:
- // Synchronous call
- args := &server.Args{7,8}
- var reply int
- err = client.Call("Arith.Multiply", args, &reply)
- if err != nil {
- log.Fatal("arith error:", err)
- }
- fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
- or
- // Asynchronous call
- quotient := new(Quotient)
- divCall := client.Go("Arith.Divide", args, quotient, nil)
- replyCall := <-divCall.Done // will be equal to divCall
- // check errors, print, etc.
- A server implementation will often provide a simple, type-safe wrapper for the
- client.
- The net/rpc package is frozen and is not accepting new features.
- */
- package rpc
- import (
- "bufio"
- "encoding/gob"
- "errors"
- "go/token"
- "io"
- "log"
- "net"
- "reflect"
- "strings"
- "sync"
- )
- // Precompute the reflect type for error. Can't use error directly
- // because Typeof takes an empty interface value. This is annoying.
- var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
- type methodType struct {
- sync.Mutex // protects counters
- method reflect.Method
- ArgType reflect.Type
- ReplyType reflect.Type
- numCalls uint
- }
- type service struct {
- name string // name of service
- rcvr reflect.Value // receiver of methods for the service
- typ reflect.Type // type of the receiver
- method map[string]*methodType // registered methods
- }
- // Request is a header written before every RPC call. It is used internally
- // but documented here as an aid to debugging, such as when analyzing
- // network traffic.
- type Request struct {
- ServiceMethod string // format: "Service.Method"
- Seq uint64 // sequence number chosen by client
- next *Request // for free list in Server
- }
- // Response is a header written before every RPC return. It is used internally
- // but documented here as an aid to debugging, such as when analyzing
- // network traffic.
- type Response struct {
- ServiceMethod string // echoes that of the Request
- Seq uint64 // echoes that of the request
- Error string // error, if any.
- next *Response // for free list in Server
- }
- // Server represents an RPC Server.
- type Server struct {
- serviceMap sync.Map // map[string]*service
- reqLock sync.Mutex // protects freeReq
- freeReq *Request
- respLock sync.Mutex // protects freeResp
- freeResp *Response
- }
- // NewServer returns a new Server.
- func NewServer() *Server {
- return &Server{}
- }
- // DefaultServer is the default instance of *Server.
- var DefaultServer = NewServer()
- // Is this type exported or a builtin?
- func isExportedOrBuiltinType(t reflect.Type) bool {
- for t.Kind() == reflect.Ptr {
- t = t.Elem()
- }
- // PkgPath will be non-empty even for an exported type,
- // so we need to check the type name as well.
- return token.IsExported(t.Name()) || t.PkgPath() == ""
- }
- // Register publishes in the server the set of methods of the
- // receiver value that satisfy the following conditions:
- // - exported method of exported type
- // - two arguments, both of exported type
- // - the second argument is a pointer
- // - one return value, of type error
- //
- // It returns an error if the receiver is not an exported type or has
- // no suitable methods. It also logs the error using package log.
- // The client accesses each method using a string of the form "Type.Method",
- // where Type is the receiver's concrete type.
- func (server *Server) Register(rcvr any) error {
- return server.register(rcvr, "", false)
- }
- // RegisterName is like Register but uses the provided name for the type
- // instead of the receiver's concrete type.
- func (server *Server) RegisterName(name string, rcvr any) error {
- return server.register(rcvr, name, true)
- }
- func (server *Server) register(rcvr any, name string, useName bool) error {
- s := new(service)
- s.typ = reflect.TypeOf(rcvr)
- s.rcvr = reflect.ValueOf(rcvr)
- sname := reflect.Indirect(s.rcvr).Type().Name()
- if useName {
- sname = name
- }
- if sname == "" {
- s := "rpc.Register: no service name for type " + s.typ.String()
- log.Print(s)
- return errors.New(s)
- }
- if !token.IsExported(sname) && !useName {
- s := "rpc.Register: type " + sname + " is not exported"
- log.Print(s)
- return errors.New(s)
- }
- s.name = sname
- // Install the methods
- s.method = suitableMethods(s.typ, true)
- if len(s.method) == 0 {
- str := ""
- // To help the user, see if a pointer receiver would work.
- method := suitableMethods(reflect.PtrTo(s.typ), false)
- if len(method) != 0 {
- str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
- } else {
- str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
- }
- log.Print(str)
- return errors.New(str)
- }
- if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
- return errors.New("rpc: service already defined: " + sname)
- }
- return nil
- }
- // suitableMethods returns suitable Rpc methods of typ, it will report
- // error using log if reportErr is true.
- func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
- methods := make(map[string]*methodType)
- for m := 0; m < typ.NumMethod(); m++ {
- method := typ.Method(m)
- mtype := method.Type
- mname := method.Name
- // Method must be exported.
- if method.PkgPath != "" {
- continue
- }
- // Method needs three ins: receiver, *args, *reply.
- if mtype.NumIn() != 3 {
- if reportErr {
- log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
- }
- continue
- }
- // First arg need not be a pointer.
- argType := mtype.In(1)
- if !isExportedOrBuiltinType(argType) {
- if reportErr {
- log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
- }
- continue
- }
- // Second arg must be a pointer.
- replyType := mtype.In(2)
- if replyType.Kind() != reflect.Ptr {
- if reportErr {
- log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
- }
- continue
- }
- // Reply type must be exported.
- if !isExportedOrBuiltinType(replyType) {
- if reportErr {
- log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
- }
- continue
- }
- // Method needs one out.
- if mtype.NumOut() != 1 {
- if reportErr {
- log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())
- }
- continue
- }
- // The return type of the method must be error.
- if returnType := mtype.Out(0); returnType != typeOfError {
- if reportErr {
- log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)
- }
- continue
- }
- methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
- }
- return methods
- }
- // A value sent as a placeholder for the server's response value when the server
- // receives an invalid request. It is never decoded by the client since the Response
- // contains an error when it is used.
- var invalidRequest = struct{}{}
- func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply any, codec ServerCodec, errmsg string) {
- resp := server.getResponse()
- // Encode the response header
- resp.ServiceMethod = req.ServiceMethod
- if errmsg != "" {
- resp.Error = errmsg
- reply = invalidRequest
- }
- resp.Seq = req.Seq
- sending.Lock()
- err := codec.WriteResponse(resp, reply)
- if debugLog && err != nil {
- log.Println("rpc: writing response:", err)
- }
- sending.Unlock()
- server.freeResponse(resp)
- }
- func (m *methodType) NumCalls() (n uint) {
- m.Lock()
- n = m.numCalls
- m.Unlock()
- return n
- }
- func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
- if wg != nil {
- defer wg.Done()
- }
- mtype.Lock()
- mtype.numCalls++
- mtype.Unlock()
- function := mtype.method.Func
- // Invoke the method, providing a new value for the reply.
- returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
- // The return value for the method is an error.
- errInter := returnValues[0].Interface()
- errmsg := ""
- if errInter != nil {
- errmsg = errInter.(error).Error()
- }
- server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
- server.freeRequest(req)
- }
- type gobServerCodec struct {
- rwc io.ReadWriteCloser
- dec *gob.Decoder
- enc *gob.Encoder
- encBuf *bufio.Writer
- closed bool
- }
- func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
- return c.dec.Decode(r)
- }
- func (c *gobServerCodec) ReadRequestBody(body any) error {
- return c.dec.Decode(body)
- }
- func (c *gobServerCodec) WriteResponse(r *Response, body any) (err error) {
- if err = c.enc.Encode(r); err != nil {
- if c.encBuf.Flush() == nil {
- // Gob couldn't encode the header. Should not happen, so if it does,
- // shut down the connection to signal that the connection is broken.
- log.Println("rpc: gob error encoding response:", err)
- c.Close()
- }
- return
- }
- if err = c.enc.Encode(body); err != nil {
- if c.encBuf.Flush() == nil {
- // Was a gob problem encoding the body but the header has been written.
- // Shut down the connection to signal that the connection is broken.
- log.Println("rpc: gob error encoding body:", err)
- c.Close()
- }
- return
- }
- return c.encBuf.Flush()
- }
- func (c *gobServerCodec) Close() error {
- if c.closed {
- // Only call c.rwc.Close once; otherwise the semantics are undefined.
- return nil
- }
- c.closed = true
- return c.rwc.Close()
- }
- // ServeConn runs the server on a single connection.
- // ServeConn blocks, serving the connection until the client hangs up.
- // The caller typically invokes ServeConn in a go statement.
- // ServeConn uses the gob wire format (see package gob) on the
- // connection. To use an alternate codec, use ServeCodec.
- // See NewClient's comment for information about concurrent access.
- func (server *Server) ServeConn(conn io.ReadWriteCloser) {
- buf := bufio.NewWriter(conn)
- srv := &gobServerCodec{
- rwc: conn,
- dec: gob.NewDecoder(conn),
- enc: gob.NewEncoder(buf),
- encBuf: buf,
- }
- server.ServeCodec(srv)
- }
- // ServeCodec is like ServeConn but uses the specified codec to
- // decode requests and encode responses.
- func (server *Server) ServeCodec(codec ServerCodec) {
- sending := new(sync.Mutex)
- wg := new(sync.WaitGroup)
- for {
- service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
- if err != nil {
- if debugLog && err != io.EOF {
- log.Println("rpc:", err)
- }
- if !keepReading {
- break
- }
- // send a response if we actually managed to read a header.
- if req != nil {
- server.sendResponse(sending, req, invalidRequest, codec, err.Error())
- server.freeRequest(req)
- }
- continue
- }
- wg.Add(1)
- go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
- }
- // We've seen that there are no more requests.
- // Wait for responses to be sent before closing codec.
- wg.Wait()
- codec.Close()
- }
- // ServeRequest is like ServeCodec but synchronously serves a single request.
- // It does not close the codec upon completion.
- func (server *Server) ServeRequest(codec ServerCodec) error {
- sending := new(sync.Mutex)
- service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
- if err != nil {
- if !keepReading {
- return err
- }
- // send a response if we actually managed to read a header.
- if req != nil {
- server.sendResponse(sending, req, invalidRequest, codec, err.Error())
- server.freeRequest(req)
- }
- return err
- }
- service.call(server, sending, nil, mtype, req, argv, replyv, codec)
- return nil
- }
- func (server *Server) getRequest() *Request {
- server.reqLock.Lock()
- req := server.freeReq
- if req == nil {
- req = new(Request)
- } else {
- server.freeReq = req.next
- *req = Request{}
- }
- server.reqLock.Unlock()
- return req
- }
- func (server *Server) freeRequest(req *Request) {
- server.reqLock.Lock()
- req.next = server.freeReq
- server.freeReq = req
- server.reqLock.Unlock()
- }
- func (server *Server) getResponse() *Response {
- server.respLock.Lock()
- resp := server.freeResp
- if resp == nil {
- resp = new(Response)
- } else {
- server.freeResp = resp.next
- *resp = Response{}
- }
- server.respLock.Unlock()
- return resp
- }
- func (server *Server) freeResponse(resp *Response) {
- server.respLock.Lock()
- resp.next = server.freeResp
- server.freeResp = resp
- server.respLock.Unlock()
- }
- func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
- service, mtype, req, keepReading, err = server.readRequestHeader(codec)
- if err != nil {
- if !keepReading {
- return
- }
- // discard body
- codec.ReadRequestBody(nil)
- return
- }
- // Decode the argument value.
- argIsValue := false // if true, need to indirect before calling.
- if mtype.ArgType.Kind() == reflect.Ptr {
- argv = reflect.New(mtype.ArgType.Elem())
- } else {
- argv = reflect.New(mtype.ArgType)
- argIsValue = true
- }
- // argv guaranteed to be a pointer now.
- if err = codec.ReadRequestBody(argv.Interface()); err != nil {
- return
- }
- if argIsValue {
- argv = argv.Elem()
- }
- replyv = reflect.New(mtype.ReplyType.Elem())
- switch mtype.ReplyType.Elem().Kind() {
- case reflect.Map:
- replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))
- case reflect.Slice:
- replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))
- }
- return
- }
- func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
- // Grab the request header.
- req = server.getRequest()
- err = codec.ReadRequestHeader(req)
- if err != nil {
- req = nil
- if err == io.EOF || err == io.ErrUnexpectedEOF {
- return
- }
- err = errors.New("rpc: server cannot decode request: " + err.Error())
- return
- }
- // We read the header successfully. If we see an error now,
- // we can still recover and move on to the next request.
- keepReading = true
- dot := strings.LastIndex(req.ServiceMethod, ".")
- if dot < 0 {
- err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
- return
- }
- serviceName := req.ServiceMethod[:dot]
- methodName := req.ServiceMethod[dot+1:]
- // Look up the request.
- svci, ok := server.serviceMap.Load(serviceName)
- if !ok {
- err = errors.New("rpc: can't find service " + req.ServiceMethod)
- return
- }
- svc = svci.(*service)
- mtype = svc.method[methodName]
- if mtype == nil {
- err = errors.New("rpc: can't find method " + req.ServiceMethod)
- }
- return
- }
- // Accept accepts connections on the listener and serves requests
- // for each incoming connection. Accept blocks until the listener
- // returns a non-nil error. The caller typically invokes Accept in a
- // go statement.
- func (server *Server) Accept(lis net.Listener) {
- for {
- conn, err := lis.Accept()
- if err != nil {
- log.Print("rpc.Serve: accept:", err.Error())
- return
- }
- go server.ServeConn(conn)
- }
- }
- // Register publishes the receiver's methods in the DefaultServer.
- func Register(rcvr any) error { return DefaultServer.Register(rcvr) }
- // RegisterName is like Register but uses the provided name for the type
- // instead of the receiver's concrete type.
- func RegisterName(name string, rcvr any) error {
- return DefaultServer.RegisterName(name, rcvr)
- }
- // A ServerCodec implements reading of RPC requests and writing of
- // RPC responses for the server side of an RPC session.
- // The server calls ReadRequestHeader and ReadRequestBody in pairs
- // to read requests from the connection, and it calls WriteResponse to
- // write a response back. The server calls Close when finished with the
- // connection. ReadRequestBody may be called with a nil
- // argument to force the body of the request to be read and discarded.
- // See NewClient's comment for information about concurrent access.
- type ServerCodec interface {
- ReadRequestHeader(*Request) error
- ReadRequestBody(any) error
- WriteResponse(*Response, any) error
- // Close can be called multiple times and must be idempotent.
- Close() error
- }
- // ServeConn runs the DefaultServer on a single connection.
- // ServeConn blocks, serving the connection until the client hangs up.
- // The caller typically invokes ServeConn in a go statement.
- // ServeConn uses the gob wire format (see package gob) on the
- // connection. To use an alternate codec, use ServeCodec.
- // See NewClient's comment for information about concurrent access.
- func ServeConn(conn io.ReadWriteCloser) {
- DefaultServer.ServeConn(conn)
- }
- // ServeCodec is like ServeConn but uses the specified codec to
- // decode requests and encode responses.
- func ServeCodec(codec ServerCodec) {
- DefaultServer.ServeCodec(codec)
- }
- // ServeRequest is like ServeCodec but synchronously serves a single request.
- // It does not close the codec upon completion.
- func ServeRequest(codec ServerCodec) error {
- return DefaultServer.ServeRequest(codec)
- }
- // Accept accepts connections on the listener and serves requests
- // to DefaultServer for each incoming connection.
- // Accept blocks; the caller typically invokes it in a go statement.
- func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
|