trex_stl_rx_service_icmp.py revision 5f825e20
1from .trex_stl_rx_service_api import RXServiceAPI
2
3from ..trex_stl_streams import STLStream, STLTXSingleBurst
4from ..trex_stl_packet_builder_scapy import STLPktBuilder
5
6from scapy.layers.l2 import Ether
7from scapy.layers.inet import IP, ICMP
8
9
10class RXServiceICMP(RXServiceAPI):
11
12    def __init__ (self, port, ping_ip, pkt_size):
13
14        super(RXServiceICMP, self).__init__(port, layer_mode = RXServiceAPI.LAYER_MODE_L3)
15        self.ping_ip  = ping_ip
16        self.pkt_size = pkt_size
17
18    def get_name (self):
19        return "PING"
20
21    def pre_execute (self):
22
23        if not self.port.is_resolved():
24            return self.port.err('ping - port has an unresolved destination, cannot determine next hop MAC address')
25
26        self.layer_cfg = dict(self.port.get_layer_cfg())
27
28        return self.port.ok()
29
30
31    # return a list of streams for request
32    def generate_request (self):
33
34        base_pkt = Ether(dst = self.layer_cfg['ether']['dst'])/IP(src = self.layer_cfg['ipv4']['src'], dst = self.ping_ip)/ICMP(type = 8)
35        pad = max(0, self.pkt_size - len(base_pkt))
36
37        base_pkt = base_pkt / (pad * 'x')
38
39        s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) )
40
41        self.base_pkt = base_pkt
42
43        return [s1]
44
45    def on_pkt_rx (self, pkt, start_ts):
46
47        scapy_pkt = Ether(pkt['binary'])
48        if not 'ICMP' in scapy_pkt:
49            return None
50
51        ip = scapy_pkt['IP']
52        if ip.dst != self.layer_cfg['ipv4']['src']:
53            return None
54
55        icmp = scapy_pkt['ICMP']
56
57        dt = pkt['ts'] - start_ts
58
59        # echo reply
60        if icmp.type == 0:
61            # check seq
62            if icmp.seq != self.base_pkt['ICMP'].seq:
63                return None
64            return self.port.ok('Reply from {0}: bytes={1}, time={2:.2f}ms, TTL={3}'.format(ip.src, len(pkt['binary']), dt * 1000, ip.ttl))
65
66        # unreachable
67        elif icmp.type == 3:
68            # check seq
69            if icmp.payload.seq != self.base_pkt['ICMP'].seq:
70                return None
71            return self.port.ok('Reply from {0}: Destination host unreachable'.format(icmp.src))
72
73        else:
74            # skip any other types
75            #scapy_pkt.show2()
76            return None
77
78
79
80    # return the str of a timeout err
81    def on_timeout_err (self, retries):
82        return self.port.ok('Request timed out.')
83
84