garbage.py revision 781d71db
1"""Garbage collection thread for representing zmq refcount of Python objects
2used in zero-copy sends.
3"""
4
5# Copyright (C) PyZMQ Developers
6# Distributed under the terms of the Modified BSD License.
7
8
9import atexit
10import struct
11
12from os import getpid
13from collections import namedtuple
14from threading import Thread, Event, Lock
15import warnings
16
17import zmq
18
19
20gcref = namedtuple('gcref', ['obj', 'event'])
21
22class GarbageCollectorThread(Thread):
23    """Thread in which garbage collection actually happens."""
24    def __init__(self, gc):
25        super(GarbageCollectorThread, self).__init__()
26        self.gc = gc
27        self.daemon = True
28        self.pid = getpid()
29        self.ready = Event()
30
31    def run(self):
32        # detect fork at begining of the thread
33        if getpid is None or getpid() != self.pid:
34            self.ready.set()
35            return
36        try:
37            s = self.gc.context.socket(zmq.PULL)
38            s.linger = 0
39            s.bind(self.gc.url)
40        finally:
41            self.ready.set()
42
43        while True:
44            # detect fork
45            if getpid is None or getpid() != self.pid:
46                return
47            msg = s.recv()
48            if msg == b'DIE':
49                break
50            fmt = 'L' if len(msg) == 4 else 'Q'
51            key = struct.unpack(fmt, msg)[0]
52            tup = self.gc.refs.pop(key, None)
53            if tup and tup.event:
54                tup.event.set()
55            del tup
56        s.close()
57
58
59class GarbageCollector(object):
60    """PyZMQ Garbage Collector
61
62    Used for representing the reference held by libzmq during zero-copy sends.
63    This object holds a dictionary, keyed by Python id,
64    of the Python objects whose memory are currently in use by zeromq.
65
66    When zeromq is done with the memory, it sends a message on an inproc PUSH socket
67    containing the packed size_t (32 or 64-bit unsigned int),
68    which is the key in the dict.
69    When the PULL socket in the gc thread receives that message,
70    the reference is popped from the dict,
71    and any tracker events that should be signaled fire.
72    """
73
74    refs = None
75    _context = None
76    _lock = None
77    url = "inproc://pyzmq.gc.01"
78
79    def __init__(self, context=None):
80        super(GarbageCollector, self).__init__()
81        self.refs = {}
82        self.pid = None
83        self.thread = None
84        self._context = context
85        self._lock = Lock()
86        self._stay_down = False
87        atexit.register(self._atexit)
88
89    @property
90    def context(self):
91        if self._context is None:
92            self._context = zmq.Context()
93        return self._context
94
95    @context.setter
96    def context(self, ctx):
97        if self.is_alive():
98            if self.refs:
99                warnings.warn("Replacing gc context while gc is running", RuntimeWarning)
100            self.stop()
101        self._context = ctx
102
103    def _atexit(self):
104        """atexit callback
105
106        sets _stay_down flag so that gc doesn't try to start up again in other atexit handlers
107        """
108        self._stay_down = True
109        self.stop()
110
111    def stop(self):
112        """stop the garbage-collection thread"""
113        if not self.is_alive():
114            return
115        self._stop()
116
117    def _stop(self):
118        push = self.context.socket(zmq.PUSH)
119        push.connect(self.url)
120        push.send(b'DIE')
121        push.close()
122        self.thread.join()
123        self.context.term()
124        self.refs.clear()
125        self.context = None
126
127    def start(self):
128        """Start a new garbage collection thread.
129
130        Creates a new zmq Context used for garbage collection.
131        Under most circumstances, this will only be called once per process.
132        """
133        if self.thread is not None and self.pid != getpid():
134            # It's re-starting, must free earlier thread's context
135            # since a fork probably broke it
136            self._stop()
137        self.pid = getpid()
138        self.refs = {}
139        self.thread = GarbageCollectorThread(self)
140        self.thread.start()
141        self.thread.ready.wait()
142
143    def is_alive(self):
144        """Is the garbage collection thread currently running?
145
146        Includes checks for process shutdown or fork.
147        """
148        if (getpid is None or
149            getpid() != self.pid or
150            self.thread is None or
151            not self.thread.is_alive()
152            ):
153            return False
154        return True
155
156    def store(self, obj, event=None):
157        """store an object and (optionally) event for zero-copy"""
158        if not self.is_alive():
159            if self._stay_down:
160                return 0
161            # safely start the gc thread
162            # use lock and double check,
163            # so we don't start multiple threads
164            with self._lock:
165                if not self.is_alive():
166                    self.start()
167        tup = gcref(obj, event)
168        theid = id(tup)
169        self.refs[theid] = tup
170        return theid
171
172    def __del__(self):
173        if not self.is_alive():
174            return
175        try:
176            self.stop()
177        except Exception as e:
178            raise (e)
179
180gc = GarbageCollector()
181