trex_capture.py revision 951b09ef
1from trex_stl_lib.api import *
2from trex_stl_lib.utils import parsing_opts, text_tables
3import threading
4import tempfile
5
6class CaptureMonitorWriter(object):
7    def init (self):
8        raise NotImplementedError
9
10    def deinit(self):
11        raise NotImplementedError
12
13    def handle_pkts (self, pkts):
14        raise NotImplementedError
15
16
17class CaptureMonitorWriterStdout(CaptureMonitorWriter):
18    def __init__ (self, logger, is_brief):
19        self.logger      = logger
20        self.is_brief    = is_brief
21        self.pkt_count   = 0
22        self.byte_count  = 0
23
24        self.RX_ARROW = u'\u25c0\u2500\u2500'
25        self.TX_ARROW = u'\u25b6\u2500\u2500'
26
27    def init (self):
28        self.logger.log(format_text("\nStarting capture monitor on selected ports", 'bold'))
29        self.logger.log(format_text("*** any captured packet will be displayed on screen ***\n"))
30        self.logger.log(format_text("('capture monitor stop' to abort capturing...)\n", 'bold'))
31
32
33    def deinit (self):
34        pass
35
36
37    def handle_pkts (self, pkts):
38        for pkt in pkts:
39            self.__handle_pkt(pkt)
40
41        self.logger.prompt_redraw()
42
43    def get_scapy_name (self, pkt_scapy):
44        layer = pkt_scapy
45        while layer.payload and layer.payload.name not in('Padding', 'Raw'):
46            layer = layer.payload
47
48        return layer.name
49
50    def format_origin (self, origin):
51        if origin == 'RX':
52            return u'{0} {1}'.format(self.RX_ARROW, 'RX')
53        elif origin == 'TX':
54            return u'{0} {1}'.format(self.TX_ARROW, 'TX')
55        else:
56            return '{0}'.format(origin)
57
58
59    def __handle_pkt (self, pkt):
60        pkt_bin = base64.b64decode(pkt['binary'])
61
62        self.pkt_count  += 1
63        self.byte_count += len(pkt_bin)
64
65        pkt_scapy = Ether(pkt_bin)
66        self.logger.log(format_text(u'\n\nPort: {0} {1}\n'.format(pkt['port'], self.format_origin(pkt['origin'])), 'bold', ''))
67        self.logger.log(format_text('    Type: {:}, Size: {:} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts']), 'bold'))
68
69
70
71        if self.is_brief:
72            self.logger.log('    {0}'.format(pkt_scapy.command()))
73        else:
74            pkt_scapy.show(label_lvl = '    ')
75            self.logger.log('')
76
77
78#
79class CaptureMonitorWriterPipe(CaptureMonitorWriter):
80    def __init__ (self, logger):
81        self.logger   = logger
82
83    def init (self):
84        self.fifo_name = tempfile.mktemp()
85
86        try:
87            os.mkfifo(self.fifo_name)
88
89            self.logger.log(format_text("\nPlease run 'wireshark -k -i {0}'".format(self.fifo_name), 'bold'))
90            self.logger.log('\nWaiting for Wireshark connection...')
91
92            self.fifo = os.open(self.fifo_name, os.O_WRONLY)
93            self.logger.log('Successfuly connected to Wireshark...')
94            self.logger.log(format_text('\n*** Capture monitoring started ***\n', 'bold'))
95
96            self.writer = RawPcapWriter(self.fifo_name, linktype = 1, sync = True)
97            self.writer._write_header(None)
98
99        except KeyboardInterrupt as e:
100            os.unlink(self.fifo_name)
101            raise STLError("*** pipe monitor aborted...cleaning up")
102
103        except OSError as e:
104            os.unlink(self.fifo_name)
105            raise STLError("failed to create pipe {0}\n{1}".format(self.fifo_name, str(e)))
106
107
108    def deinit (self):
109        os.unlink(self.fifo_name)
110
111
112    def handle_pkts (self, pkts):
113        for pkt in pkts:
114            pkt_bin = base64.b64decode(pkt['binary'])
115            try:
116                self.writer._write_packet(pkt_bin, sec = 0, usec = 0)
117            except IOError:
118                klgjdf
119
120
121
122class CaptureMonitor(object):
123    def __init__ (self, client, cmd_lock):
124        self.client      = client
125        self.cmd_lock    = cmd_lock
126        self.active      = False
127        self.capture_id  = -1
128        self.logger      = client.logger
129
130
131    def is_active (self):
132        return self.active
133
134
135    def get_capture_id (self):
136        return self.capture_id
137
138
139    def start (self,  tx_port_list, rx_port_list, rate_pps, mon_type):
140        # stop any previous monitors
141        if self.active:
142            self.stop()
143
144        if mon_type == 'compact':
145            self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = True)
146        elif mon_type == 'verbose':
147            self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = False)
148        elif mon_type == 'pipe':
149            self.writer = CaptureMonitorWriterPipe(self.logger)
150        else:
151            raise STLError('unknown writer type')
152
153
154        with self.logger.supress():
155                self.capture_id = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps)
156
157        self.tx_port_list = tx_port_list
158        self.rx_port_list = rx_port_list
159
160        self.writer.init()
161
162        self.t = threading.Thread(target = self.__thread_cb)
163        self.t.setDaemon(True)
164
165        try:
166            self.active = True
167            self.t.start()
168        except Exception as e:
169            self.active = False
170            raise e
171
172
173    def stop (self):
174        if self.active:
175            self.active = False
176            self.t.join()
177
178            self.client.stop_capture(self.capture_id, None)
179            self.capture_id = -1
180            self.writer.deinit()
181
182
183    def get_mon_row (self):
184        if not self.is_active():
185            return None
186
187        return [self.capture_id,
188                self.pkt_count,
189                format_num(self.byte_count, suffix = 'B'),
190                ', '.join([str(x) for x in self.tx_port_list] if self.tx_port_list else '-'),
191                ', '.join([str(x) for x in self.rx_port_list] if self.rx_port_list else '-')
192                ]
193
194
195    # sleeps with high freq checks for active
196    def __sleep (self):
197        for _ in range(5):
198            if not self.active:
199                return False
200
201            time.sleep(0.1)
202
203        return True
204
205    def __lock (self):
206        while True:
207            rc = self.cmd_lock.acquire(False)
208            if rc:
209                return True
210
211            if not self.active:
212                return False
213            time.sleep(0.1)
214
215    def __unlock (self):
216        self.cmd_lock.release()
217
218
219    def __thread_cb (self):
220        self.pkt_count  = 0
221        self.byte_count = 0
222
223        while self.active:
224            # sleep
225            if not self.__sleep():
226                break
227
228            # try to lock
229            if not self.__lock():
230                break
231
232            try:
233                rc = self.client._transmit("capture", params = {'command': 'fetch', 'capture_id': self.capture_id, 'pkt_limit': 10})
234                if not rc:
235                    raise STLError(rc)
236            finally:
237                self.__unlock()
238
239
240            pkts = rc.data()['pkts']
241            if not pkts:
242                continue
243
244            self.writer.handle_pkts(pkts)
245
246
247
248
249# main class
250class CaptureManager(object):
251    def __init__ (self, client, cmd_lock):
252        self.c          = client
253        self.cmd_lock   = cmd_lock
254        self.monitor    = CaptureMonitor(client, cmd_lock)
255        self.logger     = client.logger
256
257        # install parsers
258
259        self.parser = parsing_opts.gen_parser(self, "capture", self.parse_line_internal.__doc__)
260        self.subparsers = self.parser.add_subparsers(title = "commands", dest="commands")
261
262        self.install_record_parser()
263        self.install_monitor_parser()
264
265        # show
266        self.show_parser = self.subparsers.add_parser('show', help = "show all active captures")
267
268        # reset
269        self.clear_parser = self.subparsers.add_parser('clear', help = "remove all active captures")
270
271        # register handlers
272        self.cmds = {'record': self.parse_record, 'monitor' : self.parse_monitor, 'clear': self.parse_clear, 'show' : self.parse_show}
273
274
275    def install_record_parser (self):
276        # recording
277        self.record_parser = self.subparsers.add_parser('record', help = "PCAP recording")
278        record_sub = self.record_parser.add_subparsers(title = 'commands', dest = 'record_cmd')
279        self.record_start_parser = record_sub.add_parser('start', help = "starts a new buffered capture")
280        self.record_stop_parser  = record_sub.add_parser('stop', help = "stops an active buffered capture")
281
282        # start
283        self.record_start_parser.add_arg_list(parsing_opts.TX_PORT_LIST,
284                                              parsing_opts.RX_PORT_LIST,
285                                              parsing_opts.LIMIT)
286
287        # stop
288        self.record_stop_parser.add_arg_list(parsing_opts.CAPTURE_ID,
289                                             parsing_opts.OUTPUT_FILENAME)
290
291
292
293    def install_monitor_parser (self):
294        # monitor
295        self.monitor_parser = self.subparsers.add_parser('monitor', help = 'live monitoring')
296        monitor_sub = self.monitor_parser.add_subparsers(title = 'commands', dest = 'mon_cmd')
297        self.monitor_start_parser = monitor_sub.add_parser('start', help = 'starts a monitor')
298        self.monitor_stop_parser  = monitor_sub.add_parser('stop', help = 'stops an active monitor')
299
300        self.monitor_start_parser.add_arg_list(parsing_opts.TX_PORT_LIST,
301                                               parsing_opts.RX_PORT_LIST,
302                                               parsing_opts.MONITOR_TYPE)
303
304
305
306    def stop (self):
307        self.monitor.stop()
308
309
310    # main entry point for parsing commands from console
311    def parse_line (self, line):
312        try:
313            self.parse_line_internal(line)
314        except STLError as e:
315            self.logger.log("\nAction has failed with the following error:\n" + format_text(e.brief() + "\n", 'bold'))
316            return RC_ERR(e.brief())
317
318
319    def parse_line_internal (self, line):
320        '''Manage PCAP recorders'''
321
322        # default
323        if not line:
324            line = "show"
325
326        opts = self.parser.parse_args(line.split())
327        if not opts:
328            return opts
329
330        # call the handler
331        self.cmds[opts.commands](opts)
332
333
334    # record methods
335    def parse_record (self, opts):
336        if opts.record_cmd == 'start':
337            self.parse_record_start(opts)
338        elif opts.record_cmd == 'stop':
339            self.parse_record_stop(opts)
340        else:
341            assert(0)
342
343    def parse_record_start (self, opts):
344        if not opts.tx_port_list and not opts.rx_port_list:
345            self.record_start_parser.formatted_error('please provide either --tx or --rx')
346            return
347
348        self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit)
349
350
351    def parse_record_stop (self, opts):
352        captures = self.c.get_capture_status()
353        ids = [c['id'] for c in captures]
354
355        if opts.capture_id == self.monitor.get_capture_id():
356            self.record_stop_parser.formatted_error("'{0}' is a monitor, please use 'capture monitor stop'".format(opts.capture_id))
357            return
358
359        if opts.capture_id not in ids:
360            self.record_stop_parser.formatted_error("'{0}' is not an active capture ID".format(opts.capture_id))
361            return
362
363        self.c.stop_capture(opts.capture_id, opts.output_filename)
364
365
366    # monitor methods
367    def parse_monitor (self, opts):
368        if opts.mon_cmd == 'start':
369            self.parse_monitor_start(opts)
370        elif opts.mon_cmd == 'stop':
371            self.parse_monitor_stop(opts)
372        else:
373            assert(0)
374
375    def parse_monitor_start (self, opts):
376        mon_type = 'compact'
377
378        if opts.verbose:
379            mon_type = 'verbose'
380        elif opts.pipe:
381            mon_type = 'pipe'
382
383        if not opts.tx_port_list and not opts.rx_port_list:
384            self.monitor_start_parser.formatted_error('please provide either --tx or --rx')
385            return
386
387        self.monitor.stop()
388        self.monitor.start(opts.tx_port_list, opts.rx_port_list, 100, mon_type)
389
390    def parse_monitor_stop (self, opts):
391        self.monitor.stop()
392
393    def parse_clear (self, opts):
394        self.monitor.stop()
395        self.c.remove_all_captures()
396
397
398
399    def parse_show (self, opts):
400        data = self.c.get_capture_status()
401
402        # captures
403        cap_table = text_tables.TRexTextTable()
404        cap_table.set_cols_align(["c"] * 6)
405        cap_table.set_cols_width([15] * 6)
406
407        # monitor
408        mon_table = text_tables.TRexTextTable()
409        mon_table.set_cols_align(["c"] * 5)
410        mon_table.set_cols_width([15] * 5)
411
412        for elem in data:
413            id = elem['id']
414
415            if self.monitor.get_capture_id() == id:
416                row = self.monitor.get_mon_row()
417                mon_table.add_rows([row], header=False)
418
419            else:
420                row = [id,
421                       format_text(elem['state'], 'bold'),
422                       '[{0}/{1}]'.format(elem['count'], elem['limit']),
423                       format_num(elem['bytes'], suffix = 'B'),
424                       bitfield_to_str(elem['filter']['tx']),
425                       bitfield_to_str(elem['filter']['rx'])]
426
427                cap_table.add_rows([row], header=False)
428
429        cap_table.header(['ID', 'Status', 'Packets', 'Bytes', 'TX Ports', 'RX Ports'])
430        mon_table.header(['ID', 'Packets Seen', 'Bytes Seen', 'TX Ports', 'RX Ports'])
431
432        if cap_table._rows:
433            text_tables.print_table_with_header(cap_table, '\nActive Recorders')
434
435        if mon_table._rows:
436            text_tables.print_table_with_header(mon_table, '\nActive Monitor')
437
438
439