1#!/usr/bin/env python
2# -- Content-Encoding: UTF-8 --
3"""
4Cached thread pool, inspired from Pelix/iPOPO Thread Pool
5
6:author: Thomas Calmant
7:copyright: Copyright 2015, isandlaTech
8:license: Apache License 2.0
9:version: 0.2.5
10
11..
12
13    Copyright 2015 isandlaTech
14
15    Licensed under the Apache License, Version 2.0 (the "License");
16    you may not use this file except in compliance with the License.
17    You may obtain a copy of the License at
18
19        http://www.apache.org/licenses/LICENSE-2.0
20
21    Unless required by applicable law or agreed to in writing, software
22    distributed under the License is distributed on an "AS IS" BASIS,
23    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24    See the License for the specific language governing permissions and
25    limitations under the License.
26"""
27
28# Documentation strings format
29__docformat__ = "restructuredtext en"
30
31# Module version
32__version_info__ = (0, 2, 5)
33__version__ = ".".join(str(x) for x in __version_info__)
34
35# ------------------------------------------------------------------------------
36
37# Standard library
38import logging
39import threading
40
41try:
42    # Python 3
43    # pylint: disable=F0401
44    import queue
45except ImportError:
46    # Python 2
47    # pylint: disable=F0401
48    import Queue as queue
49
50# ------------------------------------------------------------------------------
51
52
53class EventData(object):
54    """
55    A threading event with some associated data
56    """
57    def __init__(self):
58        """
59        Sets up the event
60        """
61        self.__event = threading.Event()
62        self.__data = None
63        self.__exception = None
64
65    @property
66    def data(self):
67        """
68        Returns the associated value
69        """
70        return self.__data
71
72    @property
73    def exception(self):
74        """
75        Returns the exception used to stop the wait() method
76        """
77        return self.__exception
78
79    def clear(self):
80        """
81        Clears the event
82        """
83        self.__event.clear()
84        self.__data = None
85        self.__exception = None
86
87    def is_set(self):
88        """
89        Checks if the event is set
90        """
91        return self.__event.is_set()
92
93    def set(self, data=None):
94        """
95        Sets the event
96        """
97        self.__data = data
98        self.__exception = None
99        self.__event.set()
100
101    def raise_exception(self, exception):
102        """
103        Raises an exception in wait()
104
105        :param exception: An Exception object
106        """
107        self.__data = None
108        self.__exception = exception
109        self.__event.set()
110
111    def wait(self, timeout=None):
112        """
113        Waits for the event or for the timeout
114
115        :param timeout: Wait timeout (in seconds)
116        :return: True if the event as been set, else False
117        """
118        # The 'or' part is for Python 2.6
119        result = self.__event.wait(timeout) or self.__event.is_set()
120        # pylint: disable=E0702
121        # Pylint seems to miss the "is None" check below
122        if self.__exception is None:
123            return result
124        else:
125            raise self.__exception
126
127
128class FutureResult(object):
129    """
130    An object to wait for the result of a threaded execution
131    """
132    def __init__(self, logger=None):
133        """
134        Sets up the FutureResult object
135
136        :param logger: The Logger to use in case of error (optional)
137        """
138        self._logger = logger or logging.getLogger(__name__)
139        self._done_event = EventData()
140        self.__callback = None
141        self.__extra = None
142
143    def __notify(self):
144        """
145        Notify the given callback about the result of the execution
146        """
147        if self.__callback is not None:
148            try:
149                self.__callback(self._done_event.data,
150                                self._done_event.exception,
151                                self.__extra)
152            except Exception as ex:
153                self._logger.exception("Error calling back method: %s", ex)
154
155    def set_callback(self, method, extra=None):
156        """
157        Sets a callback method, called once the result has been computed or in
158        case of exception.
159
160        The callback method must have the following signature:
161        ``callback(result, exception, extra)``.
162
163        :param method: The method to call back in the end of the execution
164        :param extra: Extra parameter to be given to the callback method
165        """
166        self.__callback = method
167        self.__extra = extra
168        if self._done_event.is_set():
169            # The execution has already finished
170            self.__notify()
171
172    def execute(self, method, args, kwargs):
173        """
174        Execute the given method and stores its result.
175        The result is considered "done" even if the method raises an exception
176
177        :param method: The method to execute
178        :param args: Method positional arguments
179        :param kwargs: Method keyword arguments
180        :raise Exception: The exception raised by the method
181        """
182        # Normalize arguments
183        if args is None:
184            args = []
185
186        if kwargs is None:
187            kwargs = {}
188
189        try:
190            # Call the method
191            result = method(*args, **kwargs)
192        except Exception as ex:
193            # Something went wrong: propagate to the event and to the caller
194            self._done_event.raise_exception(ex)
195            raise
196        else:
197            # Store the result
198            self._done_event.set(result)
199        finally:
200            # In any case: notify the call back (if any)
201            self.__notify()
202
203    def done(self):
204        """
205        Returns True if the job has finished, else False
206        """
207        return self._done_event.is_set()
208
209    def result(self, timeout=None):
210        """
211        Waits up to timeout for the result the threaded job.
212        Returns immediately the result if the job has already been done.
213
214        :param timeout: The maximum time to wait for a result (in seconds)
215        :raise OSError: The timeout raised before the job finished
216        :raise Exception: The exception encountered during the call, if any
217        """
218        if self._done_event.wait(timeout):
219            return self._done_event.data
220        else:
221            raise OSError("Timeout raised")
222
223# ------------------------------------------------------------------------------
224
225
226class ThreadPool(object):
227    """
228    Executes the tasks stored in a FIFO in a thread pool
229    """
230    def __init__(self, max_threads, min_threads=1, queue_size=0, timeout=60,
231                 logname=None):
232        """
233        Sets up the thread pool.
234
235        Threads are kept alive 60 seconds (timeout argument).
236
237        :param max_threads: Maximum size of the thread pool
238        :param min_threads: Minimum size of the thread pool
239        :param queue_size: Size of the task queue (0 for infinite)
240        :param timeout: Queue timeout (in seconds, 60s by default)
241        :param logname: Name of the logger
242        :raise ValueError: Invalid number of threads
243        """
244        # Validate parameters
245        try:
246            max_threads = int(max_threads)
247            if max_threads < 1:
248                raise ValueError("Pool size must be greater than 0")
249        except (TypeError, ValueError) as ex:
250            raise ValueError("Invalid pool size: {0}".format(ex))
251
252        try:
253            min_threads = int(min_threads)
254            if min_threads < 0:
255                min_threads = 0
256            elif min_threads > max_threads:
257                min_threads = max_threads
258        except (TypeError, ValueError) as ex:
259            raise ValueError("Invalid pool size: {0}".format(ex))
260
261        # The logger
262        self._logger = logging.getLogger(logname or __name__)
263
264        # The loop control event
265        self._done_event = threading.Event()
266        self._done_event.set()
267
268        # The task queue
269        try:
270            queue_size = int(queue_size)
271        except (TypeError, ValueError):
272            # Not a valid integer
273            queue_size = 0
274
275        self._queue = queue.Queue(queue_size)
276        self._timeout = timeout
277        self.__lock = threading.RLock()
278
279        # The thread pool
280        self._min_threads = min_threads
281        self._max_threads = max_threads
282        self._threads = []
283
284        # Thread count
285        self._thread_id = 0
286
287        # Current number of threads, active and alive
288        self.__nb_threads = 0
289        self.__nb_active_threads = 0
290
291    def start(self):
292        """
293        Starts the thread pool. Does nothing if the pool is already started.
294        """
295        if not self._done_event.is_set():
296            # Stop event not set: we're running
297            return
298
299        # Clear the stop event
300        self._done_event.clear()
301
302        # Compute the number of threads to start to handle pending tasks
303        nb_pending_tasks = self._queue.qsize()
304        if nb_pending_tasks > self._max_threads:
305            nb_threads = self._max_threads
306        elif nb_pending_tasks < self._min_threads:
307            nb_threads = self._min_threads
308        else:
309            nb_threads = nb_pending_tasks
310
311        # Create the threads
312        for _ in range(nb_threads):
313            self.__start_thread()
314
315    def __start_thread(self):
316        """
317        Starts a new thread, if possible
318        """
319        with self.__lock:
320            if self.__nb_threads >= self._max_threads:
321                # Can't create more threads
322                return False
323
324            if self._done_event.is_set():
325                # We're stopped: do nothing
326                return False
327
328            # Prepare thread and start it
329            name = "{0}-{1}".format(self._logger.name, self._thread_id)
330            self._thread_id += 1
331
332            thread = threading.Thread(target=self.__run, name=name)
333            thread.daemon = True
334            self._threads.append(thread)
335            thread.start()
336            return True
337
338    def stop(self):
339        """
340        Stops the thread pool. Does nothing if the pool is already stopped.
341        """
342        if self._done_event.is_set():
343            # Stop event set: we're stopped
344            return
345
346        # Set the stop event
347        self._done_event.set()
348
349        with self.__lock:
350            # Add something in the queue (to unlock the join())
351            try:
352                for _ in self._threads:
353                    self._queue.put(self._done_event, True, self._timeout)
354            except queue.Full:
355                # There is already something in the queue
356                pass
357
358            # Copy the list of threads to wait for
359            threads = self._threads[:]
360
361        # Join threads outside the lock
362        for thread in threads:
363            while thread.is_alive():
364                # Wait 3 seconds
365                thread.join(3)
366                if thread.is_alive():
367                    # Thread is still alive: something might be wrong
368                    self._logger.warning("Thread %s is still alive...",
369                                         thread.name)
370
371        # Clear storage
372        del self._threads[:]
373        self.clear()
374
375    def enqueue(self, method, *args, **kwargs):
376        """
377        Queues a task in the pool
378
379        :param method: Method to call
380        :return: A FutureResult object, to get the result of the task
381        :raise ValueError: Invalid method
382        :raise Full: The task queue is full
383        """
384        if not hasattr(method, '__call__'):
385            raise ValueError("{0} has no __call__ member."
386                             .format(method.__name__))
387
388        # Prepare the future result object
389        future = FutureResult(self._logger)
390
391        # Use a lock, as we might be "resetting" the queue
392        with self.__lock:
393            # Add the task to the queue
394            self._queue.put((method, args, kwargs, future), True,
395                            self._timeout)
396
397            if self.__nb_active_threads == self.__nb_threads:
398                # All threads are taken: start a new one
399                self.__start_thread()
400
401        return future
402
403    def clear(self):
404        """
405        Empties the current queue content.
406        Returns once the queue have been emptied.
407        """
408        with self.__lock:
409            # Empty the current queue
410            try:
411                while True:
412                    self._queue.get_nowait()
413                    self._queue.task_done()
414            except queue.Empty:
415                # Queue is now empty
416                pass
417
418            # Wait for the tasks currently executed
419            self.join()
420
421    def join(self, timeout=None):
422        """
423        Waits for all the tasks to be executed
424
425        :param timeout: Maximum time to wait (in seconds)
426        :return: True if the queue has been emptied, else False
427        """
428        if self._queue.empty():
429            # Nothing to wait for...
430            return True
431        elif timeout is None:
432            # Use the original join
433            self._queue.join()
434            return True
435        else:
436            # Wait for the condition
437            with self._queue.all_tasks_done:
438                self._queue.all_tasks_done.wait(timeout)
439                return not bool(self._queue.unfinished_tasks)
440
441    def __run(self):
442        """
443        The main loop
444        """
445        with self.__lock:
446            self.__nb_threads += 1
447
448        while not self._done_event.is_set():
449            try:
450                # Wait for an action (blocking)
451                task = self._queue.get(True, self._timeout)
452                if task is self._done_event:
453                    # Stop event in the queue: get out
454                    self._queue.task_done()
455                    with self.__lock:
456                        self.__nb_threads -= 1
457                        return
458            except queue.Empty:
459                # Nothing to do yet
460                pass
461            else:
462                with self.__lock:
463                    self.__nb_active_threads += 1
464
465                # Extract elements
466                method, args, kwargs, future = task
467                try:
468                    # Call the method
469                    future.execute(method, args, kwargs)
470                except Exception as ex:
471                    self._logger.exception("Error executing %s: %s",
472                                           method.__name__, ex)
473                finally:
474                    # Mark the action as executed
475                    self._queue.task_done()
476
477                    # Thread is not active anymore
478                    self.__nb_active_threads -= 1
479
480            # Clean up thread if necessary
481            with self.__lock:
482                if self.__nb_threads > self._min_threads:
483                    # No more work for this thread, and we're above the
484                    # minimum number of threads: stop this one
485                    self.__nb_threads -= 1
486                    return
487
488        with self.__lock:
489            # Thread stops
490            self.__nb_threads -= 1
491