1/*
2 Itay Marom
3 Cisco Systems, Inc.
4*/
5
6/*
7Copyright (c) 2015-2015 Cisco Systems, Inc.
8
9Licensed under the Apache License, Version 2.0 (the "License");
10you may not use this file except in compliance with the License.
11You may obtain a copy of the License at
12
13    http://www.apache.org/licenses/LICENSE-2.0
14
15Unless required by applicable law or agreed to in writing, software
16distributed under the License is distributed on an "AS IS" BASIS,
17WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18See the License for the specific language governing permissions and
19limitations under the License.
20*/
21
22#include <trex_rpc_server_api.h>
23#include <trex_rpc_req_resp_server.h>
24#include <trex_rpc_jsonrpc_v2_parser.h>
25#include <trex_rpc_zip.h>
26
27#include <unistd.h>
28#include <sstream>
29#include <iostream>
30#include <assert.h>
31
32#include <zmq.h>
33#include <json/json.h>
34
35#include "trex_watchdog.h"
36
37/**
38 * ZMQ based request-response server
39 *
40 */
41TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "ZMQ sync request-response") {
42
43}
44
45void TrexRpcServerReqRes::_prepare() {
46    m_context = zmq_ctx_new();
47}
48
49/**
50 * main entry point for the server
51 * this function will be created on a different thread
52 *
53 * @author imarom (17-Aug-15)
54 */
55void TrexRpcServerReqRes::_rpc_thread_cb() {
56    std::stringstream ss;
57    int zmq_rc;
58
59    pthread_setname_np(pthread_self(), "Trex ZMQ sync");
60
61    m_monitor.create(m_name, 1);
62    TrexWatchDog::getInstance().register_monitor(&m_monitor);
63
64    /* create a socket based on the configuration */
65
66    m_socket  = zmq_socket (m_context, ZMQ_REP);
67
68    /* to make sure the watchdog gets tickles form time to time we give a timeout of 500ms */
69    int timeout = 500;
70    zmq_rc = zmq_setsockopt (m_socket, ZMQ_RCVTIMEO, &timeout, sizeof(int));
71    assert(zmq_rc == 0);
72
73    switch (m_cfg.get_protocol()) {
74    case TrexRpcServerConfig::RPC_PROT_TCP:
75        ss << "tcp://*:";
76        break;
77    default:
78        throw TrexRpcException("unknown protocol for RPC");
79    }
80
81    ss << m_cfg.get_port();
82
83    /* bind the scoket */
84    zmq_rc = zmq_bind (m_socket, ss.str().c_str());
85    if (zmq_rc != 0) {
86        throw TrexRpcException("Unable to start ZMQ server at: " + ss.str());
87    }
88
89    /* server main loop */
90    while (m_is_running) {
91        std::string request;
92
93        /* get the next request */
94        bool rc = fetch_one_request(request);
95        if (!rc) {
96            break;
97        }
98
99        verbose_json("Server Received: ", TrexJsonRpcV2Parser::pretty_json_str(request));
100
101        handle_request(request);
102    }
103
104    /* must be done from the same thread */
105    zmq_close(m_socket);
106
107    /* done */
108    m_monitor.disable();
109}
110
111bool
112TrexRpcServerReqRes::fetch_one_request(std::string &msg) {
113
114    zmq_msg_t zmq_msg;
115    int rc;
116
117    rc = zmq_msg_init(&zmq_msg);
118    assert(rc == 0);
119
120    while (true) {
121        m_monitor.tickle();
122
123        rc = zmq_msg_recv (&zmq_msg, m_socket, 0);
124        if (rc != -1) {
125            break;
126        }
127
128        /* timeout ? */
129        if (errno == EAGAIN) {
130            continue;
131        }
132
133        /* error ! */
134        zmq_msg_close(&zmq_msg);
135
136        /* normal shutdown and zmq_term was called */
137        if (errno == ETERM) {
138            return false;
139        } else {
140            throw TrexRpcException("Unhandled error of zmq_recv");
141        }
142    }
143
144
145
146    const char *data = (const char *)zmq_msg_data(&zmq_msg);
147    size_t len = zmq_msg_size(&zmq_msg);
148    msg.append(data, len);
149
150    zmq_msg_close(&zmq_msg);
151    return true;
152}
153
154/**
155 * stops the ZMQ based RPC server
156 *
157 */
158void TrexRpcServerReqRes::_stop_rpc_thread() {
159    /* by calling zmq_term we signal the blocked thread to exit */
160    if (m_context) {
161        zmq_term(m_context);
162    }
163
164}
165
166
167/**
168 * handles a request given to the server
169 * respondes to the request
170 */
171void TrexRpcServerReqRes::handle_request(const std::string &request) {
172    std::string response;
173
174    if ( request.size() > MAX_RPC_MSG_LEN ) {
175        std::string err_msg = "Request is too large (" + std::to_string(request.size()) + " bytes). Consider splitting to smaller chunks.";
176        TrexJsonRpcV2Parser::generate_common_error(response, err_msg);
177    } else {
178        process_request(request, response);
179    }
180
181    zmq_send(m_socket, response.c_str(), response.size(), 0);
182}
183
184void TrexRpcServerReqRes::process_request(const std::string &request, std::string &response) {
185
186    if (TrexRpcZip::is_compressed(request)) {
187        process_zipped_request(request, response);
188    } else {
189        process_request_raw(request, response);
190    }
191
192}
193
194/**
195 * main processing of the request
196 *
197 */
198void TrexRpcServerReqRes::process_request_raw(const std::string &request, std::string &response) {
199
200    std::vector<TrexJsonRpcV2ParsedObject *> commands;
201
202    Json::FastWriter writer;
203    Json::Value response_json;
204
205    /* first parse the request using JSON RPC V2 parser */
206    TrexJsonRpcV2Parser rpc_request(request);
207    rpc_request.parse(commands);
208
209    int index = 0;
210
211    /* for every command parsed - launch it */
212    for (auto command : commands) {
213        Json::Value single_response;
214
215        /* the command itself should be protected */
216        std::unique_lock<std::mutex> lock(*m_lock);
217        command->execute(single_response);
218        lock.unlock();
219
220        delete command;
221
222        response_json[index++] = single_response;
223
224        /* batch is like getting all the messages one by one - it should not be considered as stuck thread */
225        /* need to think if this is a good thing */
226        //m_monitor.tickle();
227    }
228
229    /* write the JSON to string and sever on ZMQ */
230
231    if (response.size() == 1) {
232        response = writer.write(response_json[0]);
233    } else {
234        response = writer.write(response_json);
235    }
236
237    verbose_json("Server Replied:  ", response);
238
239}
240
241void TrexRpcServerReqRes::process_zipped_request(const std::string &request, std::string &response) {
242    std::string unzipped;
243
244    /* try to uncomrpess - if fails, last shot is the JSON RPC */
245    bool rc = TrexRpcZip::uncompress(request, unzipped);
246    if (!rc) {
247        return process_request_raw(request, response);
248    }
249
250    /* process the request */
251    std::string raw_response;
252    if ( unzipped.size() > MAX_RPC_MSG_LEN ) {
253        std::string err_msg = "Request is too large (" + std::to_string(unzipped.size()) + " bytes). Consider splitting to smaller chunks.";
254        TrexJsonRpcV2Parser::generate_common_error(raw_response, err_msg);
255    } else {
256        process_request_raw(unzipped, raw_response);
257    }
258
259    TrexRpcZip::compress(raw_response, response);
260
261}
262
263/**
264 * handles a server error
265 *
266 */
267void
268TrexRpcServerReqRes::handle_server_error(const std::string &specific_err) {
269    std::string response;
270
271    /* generate error */
272    TrexJsonRpcV2Parser::generate_common_error(response, specific_err);
273
274    verbose_json("Server Replied:  ", response);
275
276    zmq_send(m_socket, response.c_str(), response.size(), 0);
277}
278
279
280
281std::string
282TrexRpcServerReqRes::test_inject_request(const std::string &req) {
283    std::string response;
284
285    process_request(req, response);
286
287    return response;
288}
289
290