scapy_isolated_test.py revision 45652ff4
1#!/usr/bin/python
2import sys, os
3from multiprocessing import Process
4import tempfile
5
6def check_offsets(build, stdout, scapy_str = None, pcap = None):
7    import sys
8    sys.stdout = stdout
9    sys.stderr = stdout
10
11    # clean this env
12    for key in sys.modules.copy().keys():
13        if key.startswith('scapy.'):
14            del sys.modules[key]
15    globals().clear()
16
17    import os
18    import outer_packages
19    from scapy.all import Ether, IP, UDP
20
21    if scapy_str:
22        pkt = eval(scapy_str)
23    elif pcap:
24        from scapy.utils import rdpcap
25        pkt = rdpcap(pcap)[0]
26    else:
27        raise Exception('Please specify scapy_str or pcap.')
28
29    if build:
30        pkt.build()
31
32    assert pkt
33    assert pkt.payload
34
35    lay = pkt
36    while lay:
37        print('  ### %s (offset %s)' % (lay.name, lay._offset))
38        lay.dump_fields_offsets()
39        if lay == pkt:
40            assert lay._offset == 0, 'Offset of first layer should be zero.'
41        else:
42            if build:
43                assert lay._offset != 0, 'Offset of second and further layers should not be zero if packets is built.'
44            else:
45                assert lay._offset == 0, 'Offset of second and further layers should be zero if packets is not built.'
46        for index, field in enumerate(lay.fields_desc):
47            if index == 0:
48                assert field._offset == 0, 'Offset of first field should be zero.'
49            else:
50                if build:
51                    if field.get_size_bytes() == 0:
52                        continue
53                    assert field._offset != 0, 'Offset of second and further fields should not be zero if packets is built.'
54                else:
55                    assert field._offset == 0, 'Offset of second and further fields should be zero if packets is not built.'
56
57        lay = lay.payload
58
59
60def isolate_env(f, *a, **k):
61    with tempfile.TemporaryFile(mode = 'w+') as tmpfile:
62        k.update({'stdout': tmpfile})
63        p = Process(target = f, args = a, kwargs = k)
64        p.start()
65        p.join()
66        print('')
67        tmpfile.seek(0)
68        print(tmpfile.read())
69    if p.exitcode:
70        raise Exception('Return status not zero, check the output')
71
72class CScapyOffsets_Test():
73    def setUp(self):
74        self.dir = os.path.abspath(os.path.dirname(__file__)) + '/'
75
76    def test_offsets_udp_build(self):
77        isolate_env(check_offsets, scapy_str = "Ether()/IP()/UDP()/('x'*9)", build = True)
78
79    def test_offsets_udp_nobuild(self):
80        isolate_env(check_offsets, scapy_str = "Ether()/IP()/UDP()/('x'*9)", build = False)
81
82    def test_offsets_pcap_build(self):
83        isolate_env(check_offsets, pcap = self.dir + 'golden/bp_sim_dns_vlans.pcap', build = True)
84
85    def test_offsets_pcap_nobuild(self):
86        isolate_env(check_offsets, pcap = self.dir + 'golden/bp_sim_dns_vlans.pcap', build = False)
87
88