trex_capture.py revision f5f92b06
1from trex_stl_lib.api import *
2from trex_stl_lib.utils import parsing_opts, text_tables
3import threading
4import tempfile
5import select
6
7class CaptureMonitorWriter(object):
8    def init (self, start_ts):
9        raise NotImplementedError
10
11    def deinit(self):
12        raise NotImplementedError
13
14    def handle_pkts (self, pkts):
15        raise NotImplementedError
16
17    def periodic_check (self):
18        raise NotImplementedError
19
20
21class CaptureMonitorWriterStdout(CaptureMonitorWriter):
22    def __init__ (self, logger, is_brief):
23        self.logger      = logger
24        self.is_brief    = is_brief
25
26        self.RX_ARROW = u'\u25c0\u2500\u2500'
27        self.TX_ARROW = u'\u25b6\u2500\u2500'
28
29    def init (self, start_ts):
30        self.start_ts = start_ts
31
32        self.logger.pre_cmd("Starting stdout capture monitor - verbose: '{0}'".format('low' if self.is_brief else 'high'))
33        self.logger.post_cmd(RC_OK)
34
35        self.logger.log(format_text("\n*** use 'capture monitor stop' to abort capturing... ***\n", 'bold'))
36
37
38    def deinit (self):
39        pass
40
41
42    def periodic_check (self):
43        return RC_OK()
44
45    def handle_pkts (self, pkts):
46        byte_count = 0
47
48        for pkt in pkts:
49            byte_count += self.__handle_pkt(pkt)
50
51        self.logger.prompt_redraw()
52        return RC_OK(byte_count)
53
54
55    def get_scapy_name (self, pkt_scapy):
56        layer = pkt_scapy
57        while layer.payload and layer.payload.name not in('Padding', 'Raw'):
58            layer = layer.payload
59
60        return layer.name
61
62
63    def format_origin (self, origin):
64        if origin == 'RX':
65            return u'{0} {1}'.format(self.RX_ARROW, 'RX')
66        elif origin == 'TX':
67            return u'{0} {1}'.format(self.TX_ARROW, 'TX')
68        else:
69            return '{0}'.format(origin)
70
71
72    def __handle_pkt (self, pkt):
73        pkt_bin = base64.b64decode(pkt['binary'])
74
75        pkt_scapy = Ether(pkt_bin)
76        self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(pkt['index'], pkt['port'], self.format_origin(pkt['origin'])), 'bold', ''))
77        self.logger.log(format_text('    Type: {}, Size: {} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts'] - self.start_ts), 'bold'))
78
79
80        if self.is_brief:
81            self.logger.log('    {0}'.format(pkt_scapy.command()))
82        else:
83            pkt_scapy.show(label_lvl = '    ')
84            self.logger.log('')
85
86        return len(pkt_bin)
87
88#
89class CaptureMonitorWriterPipe(CaptureMonitorWriter):
90    def __init__ (self, logger):
91        self.logger   = logger
92
93    def init (self, start_ts):
94        self.start_ts  = start_ts
95        self.fifo_name = tempfile.mktemp()
96
97        try:
98            self.logger.pre_cmd('Starting pipe capture monitor')
99            os.mkfifo(self.fifo_name)
100            self.logger.post_cmd(RC_OK)
101
102            self.logger.log(format_text("*** Please run 'wireshark -k -i {0}' ***".format(self.fifo_name), 'bold'))
103
104            self.logger.pre_cmd("Waiting for Wireshark pipe connection")
105            self.fifo = os.open(self.fifo_name, os.O_WRONLY)
106            self.logger.post_cmd(RC_OK())
107
108            self.logger.log(format_text('\n*** Capture monitoring started ***\n', 'bold'))
109
110            self.writer = RawPcapWriter(self.fifo_name, linktype = 1, sync = True)
111            self.writer._write_header(None)
112
113            # register a poller
114            self.poll = select.poll()
115            self.poll.register(self.fifo, select.EPOLLERR)
116
117        except KeyboardInterrupt as e:
118            self.logger.post_cmd(RC_ERR(""))
119            raise STLError("*** pipe monitor aborted...cleaning up")
120
121        except OSError as e:
122            self.logger.post_cmd(RC_ERR(""))
123            raise STLError("failed to create pipe {0}\n{1}".format(self.fifo_name, str(e)))
124
125
126    def deinit (self):
127        try:
128            os.unlink(self.fifo_name)
129        except OSError:
130            pass
131
132
133    def periodic_check (self):
134        return self.check_pipe()
135
136
137    def check_pipe (self):
138        if self.poll.poll(0):
139            return RC_ERR('*** pipe has been disconnected - aborting monitoring ***')
140
141        return RC_OK()
142
143
144    def handle_pkts (self, pkts):
145        rc = self.check_pipe()
146        if not rc:
147            return rc
148
149        byte_count = 0
150
151        for pkt in pkts:
152            pkt_bin = base64.b64decode(pkt['binary'])
153            ts      = pkt['ts']
154            sec     = int(ts)
155            usec    = int( (ts - sec) * 1e6 )
156
157            try:
158                self.writer._write_packet(pkt_bin, sec = sec, usec = usec)
159            except IOError:
160                return RC_ERR("*** failed to write packet to pipe ***")
161
162            byte_count += len(pkt_bin)
163
164        return RC_OK(byte_count)
165
166
167class CaptureMonitor(object):
168    def __init__ (self, client, cmd_lock):
169        self.client      = client
170        self.cmd_lock    = cmd_lock
171        self.active      = False
172        self.capture_id  = None
173        self.logger      = client.logger
174        self.writer      = None
175
176    def is_active (self):
177        return self.active
178
179
180    def get_capture_id (self):
181        return self.capture_id
182
183
184    def start (self, tx_port_list, rx_port_list, rate_pps, mon_type):
185        try:
186            self.start_internal(tx_port_list, rx_port_list, rate_pps, mon_type)
187        except Exception as e:
188            self.__stop()
189            raise e
190
191    def start_internal (self,  tx_port_list, rx_port_list, rate_pps, mon_type):
192        # stop any previous monitors
193        if self.active:
194            self.stop()
195
196        self.tx_port_list = tx_port_list
197        self.rx_port_list = rx_port_list
198
199        if mon_type == 'compact':
200            self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = True)
201        elif mon_type == 'verbose':
202            self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = False)
203        elif mon_type == 'pipe':
204            self.writer = CaptureMonitorWriterPipe(self.logger)
205        else:
206            raise STLError('unknown writer type')
207
208
209        with self.logger.supress():
210            data = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps)
211
212        self.capture_id = data['id']
213        self.start_ts   = data['ts']
214
215        self.writer.init(self.start_ts)
216
217
218        self.t = threading.Thread(target = self.__thread_cb)
219        self.t.setDaemon(True)
220
221        try:
222            self.active = True
223            self.t.start()
224        except Exception as e:
225            self.active = False
226            self.stop()
227            raise e
228
229    # entry point stop
230    def stop (self):
231
232        if self.active:
233            self.stop_logged()
234        else:
235            self.__stop()
236
237    # wraps stop with a logging
238    def stop_logged (self):
239        self.logger.pre_cmd("Stopping capture monitor")
240
241        try:
242            self.__stop()
243        except Exception as e:
244            self.logger.post_cmd(RC_ERR(""))
245            raise e
246
247        self.logger.post_cmd(RC_OK())
248
249    # internal stop
250    def __stop (self):
251
252        # shutdown thread
253        if self.active:
254            self.active = False
255            self.t.join()
256
257        # deinit the writer
258        if self.writer is not None:
259            self.writer.deinit()
260            self.writer = None
261
262        # cleanup capture ID if possible
263        if self.capture_id is None:
264            return
265
266        capture_id = self.capture_id
267        self.capture_id = None
268
269        # if we are disconnected - we cannot cleanup the capture
270        if not self.client.is_connected():
271            return
272
273        try:
274            captures = [x['id'] for x in self.client.get_capture_status()]
275            if capture_id not in captures:
276                return
277
278            with self.logger.supress():
279                self.client.stop_capture(capture_id)
280
281        except STLError as e:
282            self.logger.post_cmd(RC_ERR(""))
283            raise e
284
285
286    def get_mon_row (self):
287        if not self.is_active():
288            return None
289
290        return [self.capture_id,
291                self.pkt_count,
292                format_num(self.byte_count, suffix = 'B'),
293                ', '.join([str(x) for x in self.tx_port_list] if self.tx_port_list else '-'),
294                ', '.join([str(x) for x in self.rx_port_list] if self.rx_port_list else '-')
295                ]
296
297
298    # sleeps with high freq checks for active
299    def __sleep (self):
300        for _ in range(5):
301            if not self.active:
302                return False
303
304            time.sleep(0.1)
305
306        return True
307
308    def __lock (self):
309        while True:
310            rc = self.cmd_lock.acquire(False)
311            if rc:
312                return True
313
314            if not self.active:
315                return False
316            time.sleep(0.1)
317
318    def __unlock (self):
319        self.cmd_lock.release()
320
321
322    def __thread_cb (self):
323        try:
324            rc = self.__thread_main_loop()
325        finally:
326            pass
327
328        if not rc:
329            self.logger.log(str(rc))
330            self.logger.log(format_text('\n*** monitor is inactive - please restart the monitor ***\n', 'bold'))
331            self.logger.prompt_redraw()
332
333
334    def __thread_main_loop (self):
335        self.pkt_count  = 0
336        self.byte_count = 0
337
338        while self.active:
339
340            # sleep
341            if not self.__sleep():
342                break
343
344            # check that the writer is ok
345            rc = self.writer.periodic_check()
346            if not rc:
347                return rc
348
349            # try to lock
350            if not self.__lock():
351                break
352
353            try:
354                if not self.client.is_connected():
355                    return RC_ERR('*** client has been disconnected, aborting monitoring ***')
356                rc = self.client._transmit("capture", params = {'command': 'fetch', 'capture_id': self.capture_id, 'pkt_limit': 10})
357                if not rc:
358                    return rc
359
360            finally:
361                self.__unlock()
362
363
364            pkts = rc.data()['pkts']
365            if not pkts:
366                continue
367
368            rc = self.writer.handle_pkts(pkts)
369            if not rc:
370                return rc
371
372            self.pkt_count  += len(pkts)
373            self.byte_count += rc.data()
374
375        # graceful shutdown
376        return RC_OK()
377
378
379
380# main class
381class CaptureManager(object):
382    def __init__ (self, client, cmd_lock):
383        self.c          = client
384        self.cmd_lock   = cmd_lock
385        self.monitor    = CaptureMonitor(client, cmd_lock)
386        self.logger     = client.logger
387
388        # install parsers
389
390        self.parser = parsing_opts.gen_parser(self, "capture", self.parse_line_internal.__doc__)
391        self.subparsers = self.parser.add_subparsers(title = "commands", dest="commands")
392
393        self.install_record_parser()
394        self.install_monitor_parser()
395
396        # show
397        self.show_parser = self.subparsers.add_parser('show', help = "show all active captures")
398
399        # reset
400        self.clear_parser = self.subparsers.add_parser('clear', help = "remove all active captures")
401
402        # register handlers
403        self.cmds = {'record': self.parse_record, 'monitor' : self.parse_monitor, 'clear': self.parse_clear, 'show' : self.parse_show}
404
405
406    def install_record_parser (self):
407        # recording
408        self.record_parser = self.subparsers.add_parser('record', help = "PCAP recording")
409        record_sub = self.record_parser.add_subparsers(title = 'commands', dest = 'record_cmd')
410        self.record_start_parser = record_sub.add_parser('start', help = "starts a new buffered capture")
411        self.record_stop_parser  = record_sub.add_parser('stop', help = "stops an active buffered capture")
412
413        # start
414        self.record_start_parser.add_arg_list(parsing_opts.TX_PORT_LIST,
415                                              parsing_opts.RX_PORT_LIST,
416                                              parsing_opts.LIMIT)
417
418        # stop
419        self.record_stop_parser.add_arg_list(parsing_opts.CAPTURE_ID,
420                                             parsing_opts.OUTPUT_FILENAME)
421
422
423
424    def install_monitor_parser (self):
425        # monitor
426        self.monitor_parser = self.subparsers.add_parser('monitor', help = 'live monitoring')
427        monitor_sub = self.monitor_parser.add_subparsers(title = 'commands', dest = 'mon_cmd')
428        self.monitor_start_parser = monitor_sub.add_parser('start', help = 'starts a monitor')
429        self.monitor_stop_parser  = monitor_sub.add_parser('stop', help = 'stops an active monitor')
430
431        self.monitor_start_parser.add_arg_list(parsing_opts.TX_PORT_LIST,
432                                               parsing_opts.RX_PORT_LIST,
433                                               parsing_opts.MONITOR_TYPE)
434
435
436
437    def stop (self):
438        self.monitor.stop()
439
440
441    # main entry point for parsing commands from console
442    def parse_line (self, line):
443        try:
444            self.parse_line_internal(line)
445        except STLError as e:
446            self.logger.log("\nAction has failed with the following error:\n" + format_text(e.brief() + "\n", 'bold'))
447            return RC_ERR(e.brief())
448
449
450    def parse_line_internal (self, line):
451        '''Manage PCAP recorders'''
452
453        # default
454        if not line:
455            line = "show"
456
457        opts = self.parser.parse_args(line.split())
458        if not opts:
459            return opts
460
461        # call the handler
462        self.cmds[opts.commands](opts)
463
464
465    # record methods
466    def parse_record (self, opts):
467        if opts.record_cmd == 'start':
468            self.parse_record_start(opts)
469        elif opts.record_cmd == 'stop':
470            self.parse_record_stop(opts)
471        else:
472            self.record_parser.formatted_error("too few arguments")
473
474
475    def parse_record_start (self, opts):
476        if not opts.tx_port_list and not opts.rx_port_list:
477            self.record_start_parser.formatted_error('please provide either --tx or --rx')
478            return
479
480        rc = self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit)
481
482        self.logger.log(format_text("*** Capturing ID is set to '{0}' ***".format(rc['id']), 'bold'))
483        self.logger.log(format_text("*** Please call 'capture record stop --id {0} -o <out.pcap>' when done ***\n".format(rc['id']), 'bold'))
484
485
486    def parse_record_stop (self, opts):
487        captures = self.c.get_capture_status()
488        ids = [c['id'] for c in captures]
489
490        if opts.capture_id == self.monitor.get_capture_id():
491            self.record_stop_parser.formatted_error("'{0}' is a monitor, please use 'capture monitor stop'".format(opts.capture_id))
492            return
493
494        if opts.capture_id not in ids:
495            self.record_stop_parser.formatted_error("'{0}' is not an active capture ID".format(opts.capture_id))
496            return
497
498        self.c.stop_capture(opts.capture_id, opts.output_filename)
499
500
501    # monitor methods
502    def parse_monitor (self, opts):
503        if opts.mon_cmd == 'start':
504            self.parse_monitor_start(opts)
505        elif opts.mon_cmd == 'stop':
506            self.parse_monitor_stop(opts)
507        else:
508            self.monitor_parser.formatted_error("too few arguments")
509
510
511    def parse_monitor_start (self, opts):
512        mon_type = 'compact'
513
514        if opts.verbose:
515            mon_type = 'verbose'
516        elif opts.pipe:
517            mon_type = 'pipe'
518
519        if not opts.tx_port_list and not opts.rx_port_list:
520            self.monitor_start_parser.formatted_error('please provide either --tx or --rx')
521            return
522
523        self.monitor.stop()
524        self.monitor.start(opts.tx_port_list, opts.rx_port_list, 100, mon_type)
525
526    def parse_monitor_stop (self, opts):
527        self.monitor.stop()
528
529    def parse_clear (self, opts):
530        self.monitor.stop()
531        self.c.remove_all_captures()
532
533
534
535    def parse_show (self, opts):
536        data = self.c.get_capture_status()
537
538        # captures
539        cap_table = text_tables.TRexTextTable()
540        cap_table.set_cols_align(["c"] * 6)
541        cap_table.set_cols_width([15] * 6)
542
543        # monitor
544        mon_table = text_tables.TRexTextTable()
545        mon_table.set_cols_align(["c"] * 5)
546        mon_table.set_cols_width([15] * 5)
547
548        for elem in data:
549            id = elem['id']
550
551            if self.monitor.get_capture_id() == id:
552                row = self.monitor.get_mon_row()
553                mon_table.add_rows([row], header=False)
554
555            else:
556                row = [id,
557                       format_text(elem['state'], 'bold'),
558                       '[{0}/{1}]'.format(elem['count'], elem['limit']),
559                       format_num(elem['bytes'], suffix = 'B'),
560                       bitfield_to_str(elem['filter']['tx']),
561                       bitfield_to_str(elem['filter']['rx'])]
562
563                cap_table.add_rows([row], header=False)
564
565        cap_table.header(['ID', 'Status', 'Packets', 'Bytes', 'TX Ports', 'RX Ports'])
566        mon_table.header(['ID', 'Packets Seen', 'Bytes Seen', 'TX Ports', 'RX Ports'])
567
568        if cap_table._rows:
569            text_tables.print_table_with_header(cap_table, '\nActive Recorders')
570
571        if mon_table._rows:
572            text_tables.print_table_with_header(mon_table, '\nActive Monitor')
573
574
575