/* * $Id: rpc_server.cpp,v 1.1 2006/02/13 23:49:17 srhea Exp $ * * Copyright (c) 2006 Sean C. Rhea (srhea@srhea.net) * * Permission is hereby granted, free of charge, to any person * obtaining a copy of this software and associated documentation * files (the "Software"), to deal in the Software without * restriction, including without limitation the rights to use, copy, * modify, merge, publish, distribute, sublicense, and/or sell copies * of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * DEALINGS IN THE SOFTWARE. * */ #include #include #include #include #include #include #include #include "core/inetaddr.hpp" #include "core/mainloop.hpp" #include "core/socket.hpp" #include "rpc_channel.hpp" // Next two are just for test harness #include "rpc_channel_dgram.hpp" #include "example.h" using namespace std; using namespace boost; using namespace async; class rpc_server { typedef shared_ptr srcptr; typedef function reqcb; typedef map proc_map; typedef map vers_map; typedef map prog_map; prog_map progs; shared_ptr stream; public: rpc_server(shared_ptr stream) : stream(stream) { stream->set_req_handler( bind(&rpc_server::handle_req, this, _1, _2, _3)); } virtual ~rpc_server() { if (stream) stream->set_req_handler(0); } void handle_req(srcptr src, int xact_id, XDR *xdr) { unsigned rpcvers; if (!xdr_u_int(xdr, &rpcvers)) { reply_fail(src, xact_id, GARBAGE_ARGS); return; } if (rpcvers != 2) { reply_fail(src, xact_id, GARBAGE_ARGS); return; } unsigned prog; if (!xdr_u_int(xdr, &prog)) { reply_fail(src, xact_id, GARBAGE_ARGS); return; } vers_map* verss = 0; prog_map::iterator i = progs.find(prog); if (i == progs.end()) { reply_fail(src, xact_id, PROG_UNAVAIL); return; } verss = i->second; unsigned vers; if (!xdr_u_int(xdr, &vers)) { reply_fail(src, xact_id, GARBAGE_ARGS); return; } proc_map* procs = 0; vers_map::iterator j = verss->find(vers); if (j == verss->end()) { int low = 0, high = 0; if (!verss->empty()) { low = verss->begin()->first; high = verss->rbegin()->first; } reply_mismatch(xact_id, low, high); return; } procs = j->second; unsigned proc; if (!xdr_u_int(xdr, &proc)) { reply_fail(src, xact_id, GARBAGE_ARGS); return; } proc_map::iterator k = procs->find(proc); if (k == procs->end()) { reply_fail(src, xact_id, PROC_UNAVAIL); return; } reqcb cb = k->second; unsigned cred_type; if (!xdr_u_int(xdr, &cred_type)) { reply_fail(src, xact_id, GARBAGE_ARGS); return; } if (cred_type != AUTH_NULL) { reply_fail(src, xact_id, GARBAGE_ARGS); return; } unsigned cred_len; if (!xdr_u_int(xdr, &cred_len)) { reply_fail(src, xact_id, GARBAGE_ARGS); return; } if (cred_len != 0) { reply_fail(src, xact_id, GARBAGE_ARGS); return; } unsigned verf_len; if (!xdr_u_int(xdr, &verf_len)) { reply_fail(src, xact_id, GARBAGE_ARGS); return; } if (verf_len != 0) { reply_fail(src, xact_id, GARBAGE_ARGS); return; } cb(src, xact_id, xdr); } template void set_handler(int prog, int vers, int proc, bool_t (*arg_fn)(XDR *, A*), bool_t (*res_fn)(XDR *, R*), function)> req_cb) { reqcb cb = bind(&rpc_server::call_req, this, arg_fn, res_fn, req_cb, _1, _2, _3); vers_map* verss = 0; prog_map::iterator i = progs.find(prog); if (i == progs.end()) { verss = new vers_map; progs.insert(make_pair(prog, verss)); } else { verss = i->second; } proc_map* procs = 0; vers_map::iterator j = verss->find(vers); if (j == verss->end()) { procs = new proc_map; verss->insert(make_pair(vers, procs)); } else { procs = j->second; } proc_map::iterator k = procs->find(proc); if (k == procs->end()) { procs->insert(make_pair(proc, cb)); } else { assert(false); } } private: bool_t reply_fail_encoder(unsigned code, XDR *xdr) { unsigned accepted = MSG_ACCEPTED; unsigned auth_type = 0; unsigned auth_len = 0; if (xdr_u_int(xdr, &accepted) && xdr_u_int(xdr, &auth_type) && xdr_u_int(xdr, &auth_len) && xdr_u_int(xdr, &code)) return true; return false; } void reply_fail(srcptr src, int xact_id, unsigned code) { stream->send_resp( src, xact_id, bind(&rpc_server::reply_fail_encoder, this, code, _1)); } void reply_mismatch(int xact_id, int low, int high) { assert(false); } template bool_t xdr_encode(bool_t (*xdr_fn)(XDR *, T*), T* value, XDR *xdr) { return xdr_fn(xdr, value); } template void send_res_cb(srcptr src, int xact_id, bool_t (*res_fn)(XDR *, R*), A *args, R *res) { stream->send_resp(src, xact_id, bind(&rpc_server::xdr_encode, this, res_fn, res, _1)); delete args; } template void call_req(bool_t (*arg_fn)(XDR *, A*), bool_t (*res_fn)(XDR *, R*), function)> req_cb, srcptr src, int xact_id, XDR *in) { A *args = new A(); if (arg_fn(in, args)) { req_cb(args, bind(&rpc_server::send_res_cb, this, src, xact_id, res_fn, args, _1)); } else { delete args; reply_fail(xact_id, GARBAGE_ARGS); } } }; // Test harness follows: static void foo_handler(example_args *args, function rescb) { printf("foo called: args.str=%s, args.i=%d\n", args->str, args->i); } mainloop *acore = new mainloop; shared_ptr srv; int main(int argc, char *argv[]) { int port = atoi(argv[1]); int fd = inetsocket(SOCK_DGRAM, port); shared_ptr chnl = shared_ptr(new rpc_channel_dgram(acore, fd)); srv = shared_ptr(new rpc_server(chnl)); function)> f = foo_handler; srv->set_handler(EXAMPLE_PROG, EXAMPLE_VERS_1, EXAMPLE_PROC_FOO, xdr_example_args, xdr_example_res, f); acore->run(); return 0; }