1/*
2 Itay Marom
3 Cisco Systems, Inc.
4*/
5
6/*
7Copyright (c) 2015-2015 Cisco Systems, Inc.
8
9Licensed under the Apache License, Version 2.0 (the "License");
10you may not use this file except in compliance with the License.
11You may obtain a copy of the License at
12
13    http://www.apache.org/licenses/LICENSE-2.0
14
15Unless required by applicable law or agreed to in writing, software
16distributed under the License is distributed on an "AS IS" BASIS,
17WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18See the License for the specific language governing permissions and
19limitations under the License.
20*/
21
22#include "trex_publisher.h"
23#include "trex_rpc_zip.h"
24#include <zmq.h>
25#include <assert.h>
26#include <sstream>
27#include <iostream>
28
29/**
30 * create the publisher
31 *
32 */
33bool
34TrexPublisher::Create(uint16_t port, bool disable){
35
36    char thread_name[256];
37
38    if (disable) {
39        return (true);
40    }
41
42    m_context = zmq_ctx_new();
43    if ( m_context == 0 ) {
44        show_zmq_last_error("can't connect to ZMQ library");
45    }
46
47    /* change the pthread name temporarly for the socket creation */
48    pthread_getname_np(pthread_self(), thread_name, sizeof(thread_name));
49    pthread_setname_np(pthread_self(), "Trex Publisher");
50
51    m_publisher = zmq_socket (m_context, ZMQ_PUB);
52
53    /* restore it */
54    pthread_setname_np(pthread_self(), thread_name);
55
56    if ( m_context == 0 ) {
57        show_zmq_last_error("can't create ZMQ socket");
58    }
59
60    std::stringstream ss;
61    ss << "tcp://*:" << port;
62
63    int rc = zmq_bind (m_publisher, ss.str().c_str());
64    if (rc != 0 ) {
65        show_zmq_last_error("can't bind to ZMQ socket at " + ss.str());
66    }
67
68    std::cout << "zmq publisher at: " << ss.str() << "\n";
69    return (true);
70}
71
72
73void
74TrexPublisher::Delete(){
75    if (m_publisher) {
76
77        /* before calling zmq_close set the linger property to zero
78           (othersie zmq_ctx_destroy might hang forever)
79         */
80        int val = 0;
81        zmq_setsockopt(m_publisher, ZMQ_LINGER, &val, sizeof(val));
82
83        zmq_close (m_publisher);
84        m_publisher = NULL;
85    }
86
87    if (m_context) {
88        zmq_ctx_destroy (m_context);
89        m_context = NULL;
90    }
91
92}
93
94
95void
96TrexPublisher::publish_json(const std::string &s, uint32_t zip_threshold){
97
98    if (m_publisher) {
99        if ( (zip_threshold != 0) && (s.size() > zip_threshold) ) {
100            publish_zipped_json(s);
101        } else {
102            publish_raw_json(s);
103        }
104    }
105}
106
107void
108TrexPublisher::publish_zipped_json(const std::string &s) {
109    std::string compressed_msg;
110
111    TrexRpcZip::compress(s, compressed_msg);
112    int size = zmq_send (m_publisher, compressed_msg.c_str(), compressed_msg.length(), 0);
113    assert(size == compressed_msg.length());
114}
115
116void
117TrexPublisher::publish_raw_json(const std::string &s) {
118     int size = zmq_send (m_publisher, s.c_str(), s.length(), 0);
119     assert(size == s.length());
120}
121
122void
123TrexPublisher::publish_event(event_type_e type, const Json::Value &data) {
124    Json::FastWriter writer;
125    Json::Value value;
126    std::string s;
127
128    value["name"] = "trex-event";
129    value["type"] = type;
130    value["data"] = data;
131
132    s = writer.write(value);
133    publish_json(s);
134}
135
136void
137TrexPublisher::publish_barrier(uint32_t key) {
138    Json::FastWriter writer;
139    Json::Value value;
140    std::string s;
141
142    value["name"] = "trex-barrier";
143    value["type"] = key;
144    value["data"] = Json::objectValue;
145
146    s = writer.write(value);
147    publish_json(s);
148}
149
150
151/**
152 * error handling
153 *
154 */
155void
156TrexPublisher::show_zmq_last_error(const std::string &err){
157    std::cout << " ERROR " << err << "\n";
158    std::cout << " ZMQ: " << zmq_strerror (zmq_errno ());
159    exit(-1);
160}
161
162