1#!/usr/bin/env python3
2
3import unittest
4from framework import VppTestCase, VppTestRunner
5from util import ppp
6from scapy.packet import Raw
7from scapy.layers.inet import IP, UDP
8from syslog_rfc5424_parser import SyslogMessage, ParseError
9from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
10from vpp_papi import VppEnum
11
12
13class TestSyslog(VppTestCase):
14    """ Syslog Protocol Test Cases """
15
16    @property
17    def SYSLOG_SEVERITY(self):
18        return VppEnum.vl_api_syslog_severity_t
19
20    @classmethod
21    def setUpClass(cls):
22        super(TestSyslog, cls).setUpClass()
23
24        try:
25            cls.pg0, = cls.create_pg_interfaces(range(1))
26            cls.pg0.admin_up()
27            cls.pg0.config_ip4()
28            cls.pg0.resolve_arp()
29
30        except Exception:
31            super(TestSyslog, cls).tearDownClass()
32            raise
33
34    @classmethod
35    def tearDownClass(cls):
36        super(TestSyslog, cls).tearDownClass()
37
38    def syslog_generate(self, facility, severity, appname, msgid, sd=None,
39                        msg=None):
40        """
41        Generate syslog message
42
43        :param facility: facility value
44        :param severity: severity level
45        :param appname: application name that originate message
46        :param msgid: message identifier
47        :param sd: structured data (optional)
48        :param msg: free-form message (optional)
49        """
50        facility_str = ['kernel', 'user-level', 'mail-system',
51                        'system-daemons', 'security-authorization', 'syslogd',
52                        'line-printer', 'network-news', 'uucp', 'clock-daemon',
53                        '', 'ftp-daemon', 'ntp-subsystem', 'log-audit',
54                        'log-alert', '', 'local0', 'local1', 'local2',
55                        'local3', 'local4', 'local5', 'local6', 'local7']
56
57        severity_str = ['emergency', 'alert', 'critical', 'error', 'warning',
58                        'notice', 'informational', 'debug']
59
60        cli_str = "test syslog %s %s %s %s" % (facility_str[facility],
61                                               severity_str[severity],
62                                               appname,
63                                               msgid)
64        if sd is not None:
65            for sd_id, sd_params in sd.items():
66                cli_str += " sd-id %s" % (sd_id)
67                for name, value in sd_params.items():
68                    cli_str += " sd-param %s %s" % (name, value)
69        if msg is not None:
70            cli_str += " %s" % (msg)
71        self.vapi.cli(cli_str)
72
73    def syslog_verify(self, data, facility, severity, appname, msgid, sd=None,
74                      msg=None):
75        """
76        Verify syslog message
77
78        :param data: syslog message
79        :param facility: facility value
80        :param severity: severity level
81        :param appname: application name that originate message
82        :param msgid: message identifier
83        :param sd: structured data (optional)
84        :param msg: free-form message (optional)
85        """
86        message = data.decode('utf-8')
87        if sd is None:
88            sd = {}
89        try:
90            message = SyslogMessage.parse(message)
91        except ParseError as e:
92            self.logger.error(e)
93            raise
94        else:
95            self.assertEqual(message.facility, facility)
96            self.assertEqual(message.severity, severity)
97            self.assertEqual(message.appname, appname)
98            self.assertEqual(message.msgid, msgid)
99            self.assertEqual(message.msg, msg)
100            self.assertEqual(message.sd, sd)
101            self.assertEqual(message.version, 1)
102            self.assertEqual(message.hostname, self.pg0.local_ip4)
103
104    def test_syslog(self):
105        """ Syslog Protocol test """
106        self.vapi.syslog_set_sender(src_address=self.pg0.local_ip4,
107                                    collector_address=self.pg0.remote_ip4)
108        config = self.vapi.syslog_get_sender()
109        self.assertEqual(str(config.collector_address),
110                         self.pg0.remote_ip4)
111        self.assertEqual(config.collector_port, 514)
112        self.assertEqual(str(config.src_address), self.pg0.local_ip4)
113        self.assertEqual(config.vrf_id, 0)
114        self.assertEqual(config.max_msg_size, 480)
115
116        appname = 'test'
117        msgid = 'testMsg'
118        msg = 'this is message'
119        sd1 = {'exampleSDID@32473': {'iut': '3',
120                                     'eventSource': 'App',
121                                     'eventID': '1011'}}
122        sd2 = {'exampleSDID@32473': {'iut': '3',
123                                     'eventSource': 'App',
124                                     'eventID': '1011'},
125               'examplePriority@32473': {'class': 'high'}}
126
127        self.pg_enable_capture(self.pg_interfaces)
128        self.syslog_generate(SyslogFacility.local7,
129                             SyslogSeverity.info,
130                             appname,
131                             msgid,
132                             None,
133                             msg)
134        capture = self.pg0.get_capture(1)
135        try:
136            self.assertEqual(capture[0][IP].src, self.pg0.local_ip4)
137            self.assertEqual(capture[0][IP].dst, self.pg0.remote_ip4)
138            self.assertEqual(capture[0][UDP].dport, 514)
139            self.assert_packet_checksums_valid(capture[0], False)
140        except:
141            self.logger.error(ppp("invalid packet:", capture[0]))
142            raise
143        self.syslog_verify(capture[0][Raw].load,
144                           SyslogFacility.local7,
145                           SyslogSeverity.info,
146                           appname,
147                           msgid,
148                           None,
149                           msg)
150
151        self.pg_enable_capture(self.pg_interfaces)
152        self.vapi.syslog_set_filter(
153            self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
154        filter = self.vapi.syslog_get_filter()
155        self.assertEqual(filter.severity,
156                         self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
157        self.syslog_generate(SyslogFacility.local7,
158                             SyslogSeverity.info,
159                             appname,
160                             msgid,
161                             None,
162                             msg)
163        self.pg0.assert_nothing_captured()
164
165        self.pg_enable_capture(self.pg_interfaces)
166        self.syslog_generate(SyslogFacility.local6,
167                             SyslogSeverity.warning,
168                             appname,
169                             msgid,
170                             sd1,
171                             msg)
172        capture = self.pg0.get_capture(1)
173        self.syslog_verify(capture[0][Raw].load,
174                           SyslogFacility.local6,
175                           SyslogSeverity.warning,
176                           appname,
177                           msgid,
178                           sd1,
179                           msg)
180
181        self.vapi.syslog_set_sender(self.pg0.local_ip4,
182                                    self.pg0.remote_ip4,
183                                    collector_port=12345)
184        config = self.vapi.syslog_get_sender()
185        self.assertEqual(config.collector_port, 12345)
186
187        self.pg_enable_capture(self.pg_interfaces)
188        self.syslog_generate(SyslogFacility.local5,
189                             SyslogSeverity.err,
190                             appname,
191                             msgid,
192                             sd2,
193                             None)
194        capture = self.pg0.get_capture(1)
195        try:
196            self.assertEqual(capture[0][UDP].dport, 12345)
197        except:
198            self.logger.error(ppp("invalid packet:", capture[0]))
199            raise
200        self.syslog_verify(capture[0][Raw].load,
201                           SyslogFacility.local5,
202                           SyslogSeverity.err,
203                           appname,
204                           msgid,
205                           sd2,
206                           None)
207
208
209if __name__ == '__main__':
210    unittest.main(testRunner=VppTestRunner)
211