trex_stl_jsonrpc_client.py revision acf815db
1#!/router/bin/python
2
3import zmq
4import json
5import re
6from collections import namedtuple
7import zlib
8import struct
9
10from .trex_stl_types import *
11from .utils.common import random_id_gen
12from .utils.zipmsg import ZippedMsg
13from threading import Lock
14
15class bcolors:
16    BLUE = '\033[94m'
17    GREEN = '\033[32m'
18    YELLOW = '\033[93m'
19    RED = '\033[31m'
20    MAGENTA = '\033[35m'
21    ENDC = '\033[0m'
22    BOLD = '\033[1m'
23    UNDERLINE = '\033[4m'
24
25# sub class to describe a batch
26class BatchMessage(object):
27    def __init__ (self, rpc_client):
28        self.rpc_client = rpc_client
29        self.batch_list = []
30
31    def add (self, method_name, params = None, api_class = 'core'):
32
33        id, msg = self.rpc_client.create_jsonrpc_v2(method_name, params, api_class, encode = False)
34        self.batch_list.append(msg)
35
36    def invoke(self, block = False, chunk_size = 500000, retry = 0):
37        if not self.rpc_client.connected:
38            return RC_ERR("Not connected to server")
39
40        if chunk_size:
41            response_batch = RC()
42            size = 0
43            new_batch = []
44            for msg in self.batch_list:
45                size += len(json.dumps(msg))
46                new_batch.append(msg)
47                if size > chunk_size:
48                    batch_json = json.dumps(new_batch)
49                    response_batch.add(self.rpc_client.send_msg(batch_json))
50                    size = 0
51                    new_batch = []
52            if new_batch:
53                batch_json = json.dumps(new_batch)
54                response_batch.add(self.rpc_client.send_msg(batch_json))
55            return response_batch
56        else:
57            batch_json = json.dumps(self.batch_list)
58            return self.rpc_client.send_msg(batch_json, retry = retry)
59
60
61# JSON RPC v2.0 client
62class JsonRpcClient(object):
63
64    def __init__ (self, default_server, default_port, client):
65        self.client_api = client.api_h
66        self.logger = client.logger
67        self.connected = False
68
69        # default values
70        self.port   = default_port
71        self.server = default_server
72
73        self.id_gen = random_id_gen()
74        self.zipper = ZippedMsg()
75
76        self.lock = Lock()
77
78    def get_connection_details (self):
79        rc = {}
80        rc['server'] = self.server
81        rc['port']   = self.port
82
83        return rc
84
85    # pretty print for JSON
86    def pretty_json (self, json_str, use_colors = True):
87        pretty_str = json.dumps(json.loads(json_str), indent = 4, separators=(',', ': '), sort_keys = True)
88
89        if not use_colors:
90            return pretty_str
91
92        try:
93            # int numbers
94            pretty_str = re.sub(r'([ ]*:[ ]+)(\-?[1-9][0-9]*[^.])',r'\1{0}\2{1}'.format(bcolors.BLUE, bcolors.ENDC), pretty_str)
95            # float
96            pretty_str = re.sub(r'([ ]*:[ ]+)(\-?[1-9][0-9]*\.[0-9]+)',r'\1{0}\2{1}'.format(bcolors.MAGENTA, bcolors.ENDC), pretty_str)
97            # strings
98
99            pretty_str = re.sub(r'([ ]*:[ ]+)("[^"]*")',r'\1{0}\2{1}'.format(bcolors.RED, bcolors.ENDC), pretty_str)
100            pretty_str = re.sub(r"('[^']*')", r'{0}\1{1}'.format(bcolors.MAGENTA, bcolors.RED), pretty_str)
101        except :
102            pass
103
104        return pretty_str
105
106    def verbose_msg (self, msg):
107        self.logger.log("\n\n[verbose] " + msg, level = self.logger.VERBOSE_HIGH)
108
109
110    # batch messages
111    def create_batch (self):
112        return BatchMessage(self)
113
114    def create_jsonrpc_v2 (self, method_name, params = None, api_class = 'core', encode = True):
115        msg = {}
116        msg["jsonrpc"] = "2.0"
117        msg["method"]  = method_name
118        msg["id"] = next(self.id_gen)
119
120        msg["params"] = params if params is not None else {}
121
122        # if this RPC has an API class - add it's handler
123        if api_class:
124            msg["params"]["api_h"] = self.client_api[api_class]
125
126
127        if encode:
128            return id, json.dumps(msg)
129        else:
130            return id, msg
131
132
133    def invoke_rpc_method (self, method_name, params = None, api_class = 'core', retry = 0):
134        if not self.connected:
135            return RC_ERR("Not connected to server")
136
137        id, msg = self.create_jsonrpc_v2(method_name, params, api_class)
138
139        return self.send_msg(msg, retry = retry)
140
141
142    def send_msg (self, msg, retry = 0):
143        # REQ/RESP pattern in ZMQ requires no interrupts during the send
144        with self.lock:
145            return self.__send_msg(msg, retry)
146
147
148    def __send_msg (self, msg, retry = 0):
149        # print before
150        if self.logger.check_verbose(self.logger.VERBOSE_HIGH):
151            self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n")
152
153        # encode string to buffer
154        buffer = msg.encode()
155
156        if self.zipper.check_threshold(buffer):
157            response = self.send_raw_msg(self.zipper.compress(buffer), retry = retry)
158        else:
159            response = self.send_raw_msg(buffer, retry = retry)
160
161        if not response:
162            return response
163        elif self.zipper.is_compressed(response):
164            response = self.zipper.decompress(response)
165
166        # return to string
167        response = response.decode()
168
169        # print after
170        if self.logger.check_verbose(self.logger.VERBOSE_HIGH):
171            self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n")
172
173        # process response (batch and regular)
174        try:
175            response_json = json.loads(response)
176        except (TypeError, ValueError):
177            return RC_ERR("*** [RPC] - Failed to decode response from server")
178
179        if isinstance(response_json, list):
180            return self.process_batch_response(response_json)
181        else:
182            return self.process_single_response(response_json)
183
184
185
186    # low level send of string message
187    def send_raw_msg (self, msg, retry = 0):
188
189        retry_left = retry
190        while True:
191            try:
192                self.socket.send(msg)
193                break
194            except zmq.Again:
195                retry_left -= 1
196                if retry_left < 0:
197                    self.disconnect()
198                    return RC_ERR("*** [RPC] - Failed to send message to server")
199
200            except KeyboardInterrupt as e:
201                # must restore the socket to a sane state
202                self.reconnect()
203                raise e
204
205        retry_left = retry
206        while True:
207            try:
208                response = self.socket.recv()
209                break
210            except zmq.Again:
211                retry_left -= 1
212                if retry_left < 0:
213                    self.disconnect()
214                    return RC_ERR("*** [RPC] - Failed to get server response from {0}".format(self.transport))
215
216            except KeyboardInterrupt as e:
217                # must restore the socket to a sane state
218                self.reconnect()
219                raise e
220
221        return response
222
223
224
225    # processs a single response from server
226    def process_single_response (self, response_json):
227
228        if (response_json.get("jsonrpc") != "2.0"):
229            return RC_ERR("Malformed Response ({0})".format(str(response_json)))
230
231        # error reported by server
232        if ("error" in response_json):
233            if "specific_err" in response_json["error"]:
234                return RC_ERR(response_json["error"]["specific_err"])
235            else:
236                return RC_ERR(response_json["error"]["message"])
237
238
239        # if no error there should be a result
240        if ("result" not in response_json):
241            return RC_ERR("Malformed Response ({0})".format(str(response_json)))
242
243        return RC_OK(response_json["result"])
244
245
246
247    # process a batch response
248    def process_batch_response (self, response_json):
249        rc_batch = RC()
250
251        for single_response in response_json:
252            rc = self.process_single_response(single_response)
253            rc_batch.add(rc)
254
255        return rc_batch
256
257
258    def disconnect (self):
259        if self.connected:
260            self.socket.close(linger = 0)
261            self.context.destroy(linger = 0)
262            self.connected = False
263            return RC_OK()
264        else:
265            return RC_ERR("Not connected to server")
266
267
268    def connect(self, server = None, port = None):
269        if self.connected:
270            self.disconnect()
271
272        self.context = zmq.Context()
273
274        self.server = (server if server else self.server)
275        self.port = (port if port else self.port)
276
277        #  Socket to talk to server
278        self.transport = "tcp://{0}:{1}".format(self.server, self.port)
279
280        self.socket = self.context.socket(zmq.REQ)
281        try:
282            self.socket.connect(self.transport)
283        except zmq.error.ZMQError as e:
284            return RC_ERR("ZMQ Error: Bad server or port name: " + str(e))
285
286        self.socket.setsockopt(zmq.SNDTIMEO, 10000)
287        self.socket.setsockopt(zmq.RCVTIMEO, 10000)
288
289        self.connected = True
290
291        return RC_OK()
292
293
294    def reconnect(self):
295        # connect using current values
296        return self.connect()
297
298
299    def is_connected(self):
300        return self.connected
301
302    def __del__(self):
303        self.logger.log("Shutting down RPC client\n")
304        if hasattr(self, "context"):
305            self.context.destroy(linger=0)
306
307