Source code for psycopg2_pool

"""This module implements connection pooling, which is needed because PostgreSQL
requires separate TCP connections for concurrent sessions.
"""

from collections import deque
from time import perf_counter as uptime
from weakref import WeakSet

import psycopg2
from psycopg2 import extensions as _ext


[docs] class PoolError(Exception): pass
[docs] class ConnectionPool: """A pool of :class:`psycopg2:connection` objects. .. attribute:: minconn The minimum number of connections to keep in the pool. By default one connection is opened when the pool is created. .. attribute:: maxconn The maximum number of connections in the pool. By default the pool will attempt to open as many connections as requested. .. attribute:: idle_timeout How many seconds to keep an idle connection before closing it. The default value causes idle connections to be closed after 10 minutes (approximately, it depends on :meth:`putconn` being called). .. attribute:: connect_kwargs The keyword arguments to pass to :func:`psycopg2.connect`. If the `dsn` argument isn't specified, then it's set to an empty string by default. The following attributes are internal, they're documented here to provide insight into how the pool works. .. attribute:: connections_in_use The set of connections that have been checked out of the pool through :meth:`getconn`. Type: :class:`weakref.WeakSet`. .. attribute:: idle_connections The pool of unused connections, last in first out. Type: :class:`collections.deque`. .. attribute:: return_times A timestamp is stored in this dict when a connection is added to :attr:`.idle_connections`. That timestamp is used in :meth:`getconn` to compute how long the connection stayed idle in the pool. Type: :class:`dict`. This class provides two main methods (:meth:`getconn` and :meth:`putconn`), plus another one that you probably don't need (:meth:`clear`). """ __slots__ = ( 'minconn', 'maxconn', 'idle_timeout', 'connect_kwargs', 'idle_connections', 'connections_in_use', 'return_times', '__dict__' ) def __init__(self, minconn=1, maxconn=float('inf'), idle_timeout=600, **connect_kwargs): self.minconn = minconn self.maxconn = maxconn self.idle_timeout = idle_timeout connect_kwargs.setdefault('dsn', '') self.connect_kwargs = connect_kwargs self.connections_in_use = WeakSet() self.idle_connections = deque() self.return_times = {} for i in range(self.minconn): self._connect() def _connect(self, for_immediate_use=False): """Open a new connection. """ conn = psycopg2.connect(**self.connect_kwargs) if for_immediate_use: self.connections_in_use.add(conn) else: self.return_times[conn] = uptime() self.idle_connections.append(conn) return conn
[docs] def getconn(self): """Get a connection from the pool. If there is no idle connection available, then a new one is opened; unless there are already :attr:`.maxconn` connections open, then a :class:`PoolError` exception is raised. Any connection that is broken, or has been idle for more than :attr:`.idle_timeout` seconds, is closed and discarded. """ while True: try: # Attempt to take an idle connection from the pool. conn = self.idle_connections.pop() except IndexError: # We don't have any idle connection available, open a new one. if len(self.connections_in_use) >= self.maxconn: raise PoolError("connection pool exhausted") conn = self._connect(for_immediate_use=True) else: # Close and discard the connection if it's broken or too old. idle_since = self.return_times.pop(conn, 0) close = ( conn.info.transaction_status != _ext.TRANSACTION_STATUS_IDLE or self.idle_timeout and idle_since < (uptime() - self.idle_timeout) ) if close: conn.close() continue break return conn
[docs] def putconn(self, conn): """Return a connection to the pool. You should always return a connection to the pool, even if you've closed it. That being said, the pool only holds weak references to connections returned by :meth:`getconn`, so they should be garbage collected even if you fail to return them. """ self.connections_in_use.discard(conn) # Determine if the connection should be kept or discarded. current_time = uptime() if self.idle_timeout == 0 and len(self.idle_connections) >= self.minconn: conn.close() else: status = conn.info.transaction_status if status == _ext.TRANSACTION_STATUS_UNKNOWN: # The connection is broken, discard it. conn.close() else: if status != _ext.TRANSACTION_STATUS_IDLE: # The connection is still in a transaction, roll it back. conn.rollback() self.return_times[conn] = current_time self.idle_connections.append(conn) # Clean up the idle connections. if self.idle_timeout: # We cap the number of iterations to ensure that we don't end up in # an infinite loop. for i in range(len(self.idle_connections)): try: conn = self.idle_connections[0] except IndexError: break return_time = self.return_times.get(conn) if return_time is None: # The connection's return time is missing, give up. break if return_time < (current_time - self.idle_timeout): # This connection has been idle too long, attempt to drop it. try: popped_conn = self.idle_connections.popleft() except IndexError: # Another thread removed this connection from the queue. continue if popped_conn == conn: # Okay, we can close and discard this connection. self.return_times.pop(conn, None) conn.close() else: # We got a different connection, put it back. self.idle_connections.appendleft(popped_conn) continue else: # The leftmost connection isn't too old, so we can assume # that the other ones aren't either. break # Open new connections if we've dropped below minconn. while (len(self.idle_connections) + len(self.connections_in_use)) < self.minconn: self._connect()
[docs] def clear(self): """Close and discard all idle connections in the pool (regardless of the values of :attr:`.minconn` and :attr:`.idle_timeout`). This method could be useful if you have periods of high activity that result in many connections being opened, followed by prolonged periods with zero activity (no calls to :meth:`getconn` or :meth:`putconn`), *and* you care about closing those extraneous connections during the inactivity period. It's up to you to call this method in that case. Alternatively you may want to run a cron task to `close idle connections from the server <https://stackoverflow.com/a/30769511/>`_. """ for conn in list(self.idle_connections): try: self.idle_connections.remove(conn) except ValueError: continue self.return_times.pop(conn, None) conn.close()
[docs] class ThreadSafeConnectionPool(ConnectionPool): """ This subclass of :class:`ConnectionPool` uses a :class:`threading.RLock` object to ensure that its methods are thread safe. """ __slots__ = ('lock',) def __init__(self, **kwargs): import threading super().__init__(**kwargs) self.lock = threading.RLock()
[docs] def getconn(self): """See :meth:`ConnectionPool.getconn`.""" with self.lock: return super().getconn()
[docs] def putconn(self, conn): """See :meth:`ConnectionPool.putconn`.""" with self.lock: return super().putconn(conn)
[docs] def clear(self): """See :meth:`ConnectionPool.clear`.""" with self.lock: return super().clear()