trex_stl_rx_service_arp.py revision 1f405257
1from .trex_stl_rx_service_api import RXServiceAPI
2
3from ..trex_stl_streams import STLStream, STLTXSingleBurst
4from ..trex_stl_packet_builder_scapy import STLPktBuilder
5
6from scapy.layers.l2 import Ether, ARP
7
8
9class RXServiceARP(RXServiceAPI):
10
11    def __init__ (self, port_id):
12        super(RXServiceARP, self).__init__(port_id, layer_mode = RXServiceAPI.LAYER_MODE_L3)
13
14    def get_name (self):
15        return "ARP"
16
17    def pre_execute (self):
18
19        self.dst = self.port.get_dst_addr()
20        self.src = self.port.get_src_addr()
21
22        return self.port.ok()
23
24    # return a list of streams for request
25    def generate_request (self):
26
27        base_pkt = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(psrc = self.src['ipv4'], pdst = self.dst['ipv4'], hwsrc = self.src['mac'])
28        s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) )
29
30        return [s1]
31
32
33    def on_pkt_rx (self, pkt, start_ts):
34        # convert to scapy
35        scapy_pkt = Ether(pkt['binary'])
36
37        # if not ARP wait for the next one
38        if not 'ARP' in scapy_pkt:
39            return None
40
41        arp = scapy_pkt['ARP']
42
43        # check this is the right ARP (ARP reply with the address)
44        if (arp.op != 2) or (arp.psrc != self.dst['ipv4']):
45            return None
46
47
48        return self.port.ok({'psrc' : arp.psrc, 'hwsrc': arp.hwsrc})
49
50        #return self.port.ok('Port {0} - Recieved ARP reply from: {1}, hw: {2}'.format(self.port.port_id, arp.psrc, arp.hwsrc))
51
52
53    def on_timeout_err (self, retries):
54        return self.port.err('failed to receive ARP response ({0} retries)'.format(retries))
55
56
57
58