basedevice.py revision d3907f0d
1"""Classes for running 0MQ Devices in the background."""
2
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
5
6
7import time
8from threading import Thread
9from multiprocessing import Process
10
11from zmq import device, QUEUE, Context, ETERM, ZMQError
12
13
14class Device:
15    """A 0MQ Device to be run in the background.
16
17    You do not pass Socket instances to this, but rather Socket types::
18
19        Device(device_type, in_socket_type, out_socket_type)
20
21    For instance::
22
23        dev = Device(zmq.QUEUE, zmq.DEALER, zmq.ROUTER)
24
25    Similar to zmq.device, but socket types instead of sockets themselves are
26    passed, and the sockets are created in the work thread, to avoid issues
27    with thread safety. As a result, additional bind_{in|out} and
28    connect_{in|out} methods and setsockopt_{in|out} allow users to specify
29    connections for the sockets.
30
31    Parameters
32    ----------
33    device_type : int
34        The 0MQ Device type
35    {in|out}_type : int
36        zmq socket types, to be passed later to context.socket(). e.g.
37        zmq.PUB, zmq.SUB, zmq.REQ. If out_type is < 0, then in_socket is used
38        for both in_socket and out_socket.
39
40    Methods
41    -------
42    bind_{in_out}(iface)
43        passthrough for ``{in|out}_socket.bind(iface)``, to be called in the thread
44    connect_{in_out}(iface)
45        passthrough for ``{in|out}_socket.connect(iface)``, to be called in the
46        thread
47    setsockopt_{in_out}(opt,value)
48        passthrough for ``{in|out}_socket.setsockopt(opt, value)``, to be called in
49        the thread
50
51    Attributes
52    ----------
53    daemon : int
54        sets whether the thread should be run as a daemon
55        Default is true, because if it is false, the thread will not
56        exit unless it is killed
57    context_factory : callable (class attribute)
58        Function for creating the Context. This will be Context.instance
59        in ThreadDevices, and Context in ProcessDevices.  The only reason
60        it is not instance() in ProcessDevices is that there may be a stale
61        Context instance already initialized, and the forked environment
62        should *never* try to use it.
63    """
64
65    context_factory = Context.instance
66    """Callable that returns a context. Typically either Context.instance or Context,
67    depending on whether the device should share the global instance or not.
68    """
69
70    def __init__(self, device_type=QUEUE, in_type=None, out_type=None):
71        self.device_type = device_type
72        if in_type is None:
73            raise TypeError("in_type must be specified")
74        if out_type is None:
75            raise TypeError("out_type must be specified")
76        self.in_type = in_type
77        self.out_type = out_type
78        self._in_binds = []
79        self._in_connects = []
80        self._in_sockopts = []
81        self._out_binds = []
82        self._out_connects = []
83        self._out_sockopts = []
84        self.daemon = True
85        self.done = False
86
87    def bind_in(self, addr):
88        """Enqueue ZMQ address for binding on in_socket.
89
90        See zmq.Socket.bind for details.
91        """
92        self._in_binds.append(addr)
93
94    def connect_in(self, addr):
95        """Enqueue ZMQ address for connecting on in_socket.
96
97        See zmq.Socket.connect for details.
98        """
99        self._in_connects.append(addr)
100
101    def setsockopt_in(self, opt, value):
102        """Enqueue setsockopt(opt, value) for in_socket
103
104        See zmq.Socket.setsockopt for details.
105        """
106        self._in_sockopts.append((opt, value))
107
108    def bind_out(self, addr):
109        """Enqueue ZMQ address for binding on out_socket.
110
111        See zmq.Socket.bind for details.
112        """
113        self._out_binds.append(addr)
114
115    def connect_out(self, addr):
116        """Enqueue ZMQ address for connecting on out_socket.
117
118        See zmq.Socket.connect for details.
119        """
120        self._out_connects.append(addr)
121
122    def setsockopt_out(self, opt, value):
123        """Enqueue setsockopt(opt, value) for out_socket
124
125        See zmq.Socket.setsockopt for details.
126        """
127        self._out_sockopts.append((opt, value))
128
129    def _setup_sockets(self):
130        ctx = self.context_factory()
131
132        self._context = ctx
133
134        # create the sockets
135        ins = ctx.socket(self.in_type)
136        if self.out_type < 0:
137            outs = ins
138        else:
139            outs = ctx.socket(self.out_type)
140
141        # set sockopts (must be done first, in case of zmq.IDENTITY)
142        for opt,value in self._in_sockopts:
143            ins.setsockopt(opt, value)
144        for opt,value in self._out_sockopts:
145            outs.setsockopt(opt, value)
146
147        for iface in self._in_binds:
148            ins.bind(iface)
149        for iface in self._out_binds:
150            outs.bind(iface)
151
152        for iface in self._in_connects:
153            ins.connect(iface)
154        for iface in self._out_connects:
155            outs.connect(iface)
156
157        return ins,outs
158
159    def run_device(self):
160        """The runner method.
161
162        Do not call me directly, instead call ``self.start()``, just like a Thread.
163        """
164        ins,outs = self._setup_sockets()
165        device(self.device_type, ins, outs)
166
167    def run(self):
168        """wrap run_device in try/catch ETERM"""
169        try:
170            self.run_device()
171        except ZMQError as e:
172            if e.errno == ETERM:
173                # silence TERM errors, because this should be a clean shutdown
174                pass
175            else:
176                raise
177        finally:
178            self.done = True
179
180    def start(self):
181        """Start the device. Override me in subclass for other launchers."""
182        return self.run()
183
184    def join(self,timeout=None):
185        """wait for me to finish, like Thread.join.
186
187        Reimplemented appropriately by subclasses."""
188        tic = time.time()
189        toc = tic
190        while not self.done and not (timeout is not None and toc-tic > timeout):
191            time.sleep(.001)
192            toc = time.time()
193
194
195class BackgroundDevice(Device):
196    """Base class for launching Devices in background processes and threads."""
197
198    launcher=None
199    _launch_class=None
200
201    def start(self):
202        self.launcher = self._launch_class(target=self.run)
203        self.launcher.daemon = self.daemon
204        return self.launcher.start()
205
206    def join(self, timeout=None):
207        return self.launcher.join(timeout=timeout)
208
209
210class ThreadDevice(BackgroundDevice):
211    """A Device that will be run in a background Thread.
212
213    See Device for details.
214    """
215    _launch_class=Thread
216
217class ProcessDevice(BackgroundDevice):
218    """A Device that will be run in a background Process.
219
220    See Device for details.
221    """
222    _launch_class=Process
223    context_factory = Context
224    """Callable that returns a context. Typically either Context.instance or Context,
225    depending on whether the device should share the global instance or not.
226    """
227
228
229__all__ = ['Device', 'ThreadDevice', 'ProcessDevice']
230