r"""Contains the classes that represent the database interface.
The Database is managed by the :class:`DatabaseManager <pandemy.DatabaseManager>` class.
Each SQL-dialect is implemented as a subclass of :class:`DatabaseManager <pandemy.DatabaseManager>`.
"""
# ===============================================================
# Imports
# ===============================================================
# Standard Library
from __future__ import annotations
import itertools
import logging
from pathlib import Path
import re
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union
import urllib
import warnings
# Third Party
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine import CursorResult, Engine, make_url, URL
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.sql import text
from sqlalchemy.sql.elements import TextClause
# Local
import pandemy
import pandemy._dataframe
import pandemy._datetime
# ===============================================================
# Set Logger
# ===============================================================
# Initiate the module logger
# Handlers and formatters will be inherited from the root logger
logger = logging.getLogger(__name__)
# ===============================================================
# DatabaseManager
# ===============================================================
[docs]class DatabaseManager():
r"""Base class with functionality for managing a database.
Each database type will subclass from :class:`DatabaseManager` and
implement the initializer which is specific to each database type.
Initialization of a :class:`DatabaseManager` creates the database `engine`,
which is used to connect to and interact with the database.
The :class:`DatabaseManager` can be used on its own but with **limited** functionality
and the initialization requires a SQLAlchemy :class:`URL <sqlalchemy.engine.URL>` or
:class:`Engine <sqlalchemy.engine.Engine>`, which require some knowledge about `SQLAlchemy`_.
.. _SQLAlchemy: https://docs.sqlalchemy.org/en/14/core/engines.html#engine-configuration
.. note::
Some methods like :meth:`~DatabaseManager.upsert_table` and :meth:`~DatabaseManager.merge_df`
use dialect specific SQL syntax. These methods may not work properly if using the
:class:`DatabaseManager` directly. :class:`DatabaseManager` should *only* be used
if there is no subclass implemented that matches the desired SQL dialect.
Parameters
----------
url : :class:`str` or :class:`sqlalchemy.engine.URL` or None, default None
A SQLAlchemy connection URL to use for creating the database engine.
If ``None`` an :class:`engine <sqlalchemy.engine.Engine>` is expected
to be supplied to the `engine` parameter.
container : SQLContainer or None, default None
A container of database statements that :class:`DatabaseManager` can use.
connect_args : dict or None, default None
Additional arguments sent to the driver upon connection that further
customizes the connection.
engine_config : dict or None, default None
Additional keyword arguments passed to the :func:`sqlalchemy.create_engine` function.
engine : :class:`sqlalchemy.engine.Engine` or None, default None
A SQLAlchemy Engine to use as the database engine of :class:`DatabaseManager`.
If ``None`` (the default) the engine will be created from `url`.
**kwargs : dict
Additional keyword arguments that are not used by :class:`DatabaseManager`.
Raises
------
pandemy.CreateConnectionURLError
If there are errors with `url`.
pandemy.CreateEngineError
If the creation of the database engine fails.
pandemy.InvalidInputError
If `url` and `engine` are specified or are ``None`` at the same time.
See Also
--------
* :class:`OracleDb` : An Oracle :class:`DatabaseManager`.
* :class:`SQLiteDb` : A SQLite :class:`DatabaseManager`.
Examples
--------
Create an instance of a :class:`DatabaseManager` that connects to a SQLite in-memory database.
>>> import pandemy
>>> db = pandemy.DatabaseManager(url='sqlite://')
>>> db
DatabaseManager(
url=sqlite://,
container=None,
connect_args={},
engine_config={},
engine=Engine(sqlite://)
)
"""
# Class variables
# ---------------
# The template statements can be modified by subclasses of DatabaseManager if necessary.
_stmt_space = ' ' # Spacing to use in template statements.
# Template statement for deleting all records in a table.
_delete_from_table_stmt: str = """DELETE FROM :table"""
# Template statement to update a table.
_update_table_stmt: str = (
"""UPDATE :table
SET
:update_cols
WHERE
:where_cols"""
)
# Template statement to insert new rows, that do not exist already, into a table.
_insert_into_where_not_exists_stmt: str = (
"""INSERT INTO :table (
:insert_cols
)
SELECT
:select_values
WHERE
NOT EXISTS (
SELECT
1
FROM :table
WHERE
:where_cols
)"""
)
# Template of the MERGE statement. If empty string the database does not support the statement.
_merge_df_stmt: str = ''
__slots__ = ('url', 'container', 'connect_args', 'engine_config', 'engine')
def __init__(
self,
url: Optional[Union[str, URL]] = None,
container: Optional[pandemy.SQLContainer] = None,
connect_args: Optional[Dict[str, Any]] = None,
engine_config: Optional[dict] = None,
engine: Optional[Engine] = None,
**kwargs
) -> None:
self.container = container
self.connect_args = connect_args if connect_args is not None else {}
self.engine_config = engine_config if engine_config is not None else {}
if url and engine:
raise pandemy.InvalidInputError(
'url and engine cannot be specified at the same time.\n'
f'url={url!r}, engine={engine!r}',
data=(url, engine)
)
if url is None and engine is None:
raise pandemy.InvalidInputError('Specify either url or engine. Both cannot be None at the same time.')
if url is not None:
try:
url = make_url(url) # Create a SQLAlchemy database URL
# Encode username and password to make special characters url compatible
username = url.username if (tmp := url.username) is None else urllib.parse.quote_plus(tmp)
password = url.password if (tmp := url.password) is None else urllib.parse.quote_plus(tmp)
self.url = url.set(username=username, password=password)
except UnicodeEncodeError as e:
raise pandemy.CreateConnectionURLError(
f'Could not URL encode username or password: {e.args}', data=e.args
) from None
except Exception as e:
raise pandemy.CreateConnectionURLError(message=f'{type(e).__name__}: {e.args}', data=e.args) from None
else:
self.url = url
# Create the engine
if engine is None:
try:
self.engine = create_engine(self.url, connect_args=self.connect_args, **self.engine_config)
except Exception as e:
raise pandemy.CreateEngineError(message=f'{type(e).__name__}: {e.args}', data=e.args) from None
else:
logger.debug(f'Successfully created database engine from url: {self.url}.')
else:
self.engine = engine
self.url = engine.url
def __str__(self) -> str:
r"""String representation of the object."""
return f'{self.__class__.__name__}({repr(self.url)})'
def __repr__(self) -> str:
r"""Debug representation of the object."""
# Get the slot names of the parent classes
slots = itertools.chain.from_iterable(
getattr(cls, '__slots__', tuple()) for cls in type(self).__mro__
)
attributes = {attrib: self.__getattribute__(attrib) for attrib in slots}
# The space to add before each new parameter on a new line
space = ' ' * 4
# The name of the class
repr_str = f'{self.__class__.__name__}(\n'
# Append the attribute names and values
for attrib, value in attributes.items():
if attrib == 'password': # Mask the password
value = '***'
repr_str += f'{space}{attrib}={value!r},\n'
# Remove last unwanted ', '
repr_str = repr_str[:-2]
# Add closing parentheses
repr_str += '\n)'
return repr_str
def _is_valid_table_name(self, table: str) -> None:
r"""Check if the table name is valid.
Parameters
----------
table : str
The table name to validate.
Raises
------
pandemy.InvalidInputError
If the table name is not a string.
pandemy.InvalidTableNameError
If the table name is not valid.
"""
if not isinstance(table, str):
raise pandemy.InvalidInputError(f'table must be a string. Got {type(table)} ({table})',
data=table)
# Get the first word of the string to prevent entering a SQL query.
table_splitted = table.split(' ')
# Check that only one word was input as the table name
if (len_table_splitted := len(table_splitted)) > 1:
raise pandemy.InvalidTableNameError(f'Table name contains spaces ({len_table_splitted - 1})! '
f'The table name must be a single word.\ntable = {table}',
data=table)
@staticmethod
def _validate_chunksize(chunksize: Optional[int]) -> None:
r"""Validate that the `chunksize` parameter is an integer > 0 or None.
The `chunksize` parameter is used by the methods:
- load_table
- merge_df
- save_df
- upsert_table
Parameters
----------
chunksize : int or None
The number of rows from a DataFrame to process in each chunk.
Raises
------
pandemy.InvalidInputError
If `chunksize` is not an integer > 0 or None.
"""
if chunksize is None:
return
if not isinstance(chunksize, int):
raise pandemy.InvalidInputError(
f'chunksize must be of type int, got {type(chunksize)}', data=chunksize
)
if chunksize <= 0:
raise pandemy.InvalidInputError(
f'chunksize ({chunksize}) be an integer > 0', data=chunksize
)
def _supports_merge_statement(self) -> None:
r""""Check if the :class:`DatabaseManger` supports the MERGE statement.
If is does not support the MERGE statement :exc:`pandemy.SQLStatementNotSupportedError` is raised.
Raises
------
pandemy.SQLStatementNotSupportedError
If the MERGE statement is not supported by the database SQL dialect.
"""
if not self._merge_df_stmt:
raise pandemy.SQLStatementNotSupportedError(
f'{self.__class__.__name__} does not support the MERGE statement.'
f'Try the similar upsert_table method instead.'
)
def _prepare_input_data_for_modify_statements(
self,
df: pd.DataFrame,
update_cols: Optional[Union[str, Sequence[str]]],
update_index_cols: Union[bool, Sequence[str]],
where_cols: Sequence[str]) -> Tuple[pd.DataFrame, List[str], List[str]]:
r"""Prepare input data to be ready to use in an UPSERT or MERGE statement.
Parameters
----------
df : pandas.DataFrame
The input DataFrame from which to select the columns to use in the statements.
update_cols : str or Sequence[str] or None
The columns to update and or insert data into.
The string 'all' includes all columns of `df`.
update_index_cols : bool or Sequence[str]
If the index columns of `df` should be included in the columns to update.
``True`` indicates that the index should be included. If the index is a :class:`pandas.MultiIndex`
a sequence of str that maps against the levels to include can be used to only include the desired levels.
``False`` excludes the index column(s) from being updated.
where_cols : Sequence[str]
The columns to include in the WHERE clause.
Returns
-------
df_output : pandas.DataFrame
`df` with columns not affected by the columns used in the statement removed.
update_cols : list of str
The columns to update.
insert_cols : list of str
The columns to insert data into.
Raises
------
pandemy.InvalidColumnNameError
If a column name of `update_cols`, `update_index_cols` or `where_cols` are not
among the columns of the input DataFrame `df`.
"""
# Get the columns to update and convert to list
if update_cols == 'all':
update_cols = df.columns.tolist()
elif update_cols is None:
update_cols = []
else:
update_cols = list(update_cols)
# Add selected index columns to the columns to update
if update_index_cols:
update_cols.extend(
list(df.index.names) if update_index_cols is True else list(update_index_cols)
)
insert_cols = update_cols
update_cols = [col for col in update_cols if col not in where_cols] # where_cols should not be updated
cols_in_stmts = list(set(update_cols + insert_cols + where_cols)) # The columns used in the statements
df_output = df.reset_index()
# Check for column names that are not part of the DataFrame
if len((invalid_cols := [col for col in cols_in_stmts if col not in df_output.columns])) > 0:
raise pandemy.InvalidColumnNameError(
f'Invalid column names: {invalid_cols}.\n'
f'Columns and index of DataFrame: {df_output.columns.tolist()}',
data=invalid_cols) from None
df_output = df_output[cols_in_stmts] # Keep only the columns affected by the statements
return df_output, update_cols, insert_cols
def _create_update_statement(self,
table: str,
update_cols: Sequence[str],
where_cols: Sequence[str],
space_factor: int = 1) -> str:
r"""Create an UPDATE statement with placeholders parametrized.
Creates the statement from the template `self._update_table_stmt`.
Parameters
----------
table : str
The name of the table to update.
update_cols : Sequence[str]
The columns to update.
where_cols : Sequence[str]
The columns to include in the WHERE clause.
space_factor : int, default 1
The factor to multiply the `self._stmt_space` with to determine
the level of indentation of the columns in the SET and WHERE clause.
"""
update_stmt = self._update_table_stmt.replace(':table', table)
update_stmt = update_stmt.replace(
':update_cols',
f',\n{self._stmt_space*space_factor}'.join(f'{col} = :{col}' for col in update_cols)
)
update_stmt = update_stmt.replace(
':where_cols',
f' AND\n{self._stmt_space*space_factor}'.join(f'{col} = :{col}' for col in where_cols)
)
return update_stmt
def _create_insert_into_where_not_exists_statement(self,
table: str,
insert_cols: Sequence[str],
where_cols: Sequence[str],
select_values_space_factor: int = 2,
where_cols_space_factor: int = 4) -> str:
r"""Create an "INSERT INTO WHERE NOT EXISTS" statement with placeholders parametrized.
Creates the statement from the template `self._insert_into_where_not_exists_stmt`.
Parameters
----------
table : str
The name of the table to insert values into.
insert_cols : Sequence[str]
The columns to insert values into.
where_cols : Sequence[str]
The columns to include in the WHERE clause.
select_values_space_factor : int, default 2
The factor to multiply the `self._stmt_space` with to determine
the level of indentation of the columns in the SELECT clause.
where_cols_space_factor : int, default 4
The factor to multiply the `self._stmt_space` with to determine
the level of indentation of the columns in the WHERE clause.
"""
insert_stmt = self._insert_into_where_not_exists_stmt.replace(':table', table)
stmt_space = self._stmt_space
insert_stmt = insert_stmt.replace(
':insert_cols',
f',\n{stmt_space}'.join(insert_cols)
)
insert_stmt = insert_stmt.replace(
':select_values',
f',\n{stmt_space*select_values_space_factor}'.join(f':{col}' for col in insert_cols)
)
insert_stmt = insert_stmt.replace(
':where_cols',
f' AND\n{stmt_space*where_cols_space_factor}'.join(f'{col} = :{col}' for col in where_cols)
)
return insert_stmt
def _create_merge_statement(self,
table: str,
insert_cols: Sequence[str],
on_cols: Sequence[str],
update_cols: Sequence[str],
target_prefix: str = 't',
source_prefix: str = 's',
omit_update_where_clause: bool = True,
when_not_matched_by_source_delete: bool = False) -> str:
r"""Create a MERGE statement with placeholders parametrized.
Creates the statement from the template `self._merge_df_stmt`.
Parameters
----------
table : str
The name of the table to merge values into.
insert_cols : Sequence[str]
The columns to insert values into.
on_cols : Sequence[str]
The columns to include in the ON clause.
update_cols : Sequence[str]
The columns to update.
target_prefix : str, default 't'
The prefix to use for columns of the target table in the database.
source_prefix : str, default 's'
The prefix to use for columns of the source table (the DataFrame)
to merge into the target table.
omit_update_where_clause : bool, default True
If the WHERE clause of the UPDATE clause should be omitted.
when_not_matched_by_source_delete : bool, default False
If the THE WHEN NOT MATCHED BY SOURCE DELETE clause should be included.
This clause is only supported by Microsoft SQL Server.
"""
t = target_prefix
s = source_prefix
stmt_space = self._stmt_space
# Table name
merge_stmt = self._merge_df_stmt.replace(':table', table)
# Prefixes
merge_stmt = merge_stmt.replace(':target', t)
merge_stmt = merge_stmt.replace(':source', s)
# USING SELECT values
merge_stmt = merge_stmt.replace(
':select_values',
f',\n{stmt_space*2}'.join(f':{col} AS {col}' for col in insert_cols)
)
# ON Clause
merge_stmt = merge_stmt.replace(
':on',
f' AND\n{stmt_space}'.join(f'{t}.{col} = {s}.{col}' for col in on_cols)
)
# WHEN MATCHED THEN UPDATE
merge_stmt = merge_stmt.replace(
':update_cols',
f',\n{stmt_space*2}'.join(f't.{col} = s.{col}' for col in update_cols)
)
# WHEN MATCHED THEN UPDATE WHERE
if omit_update_where_clause:
merge_stmt = merge_stmt.replace(f'{stmt_space}WHERE\n{stmt_space*2}:update_where_cols\n', '')
else:
merge_stmt = merge_stmt.replace(
':update_where_cols',
f' OR\n{stmt_space*2}'.join(f't.{col} <> s.{col}' for col in update_cols)
)
# WHEN NOT MATCHED THEN INSERT INTO
merge_stmt = merge_stmt.replace(
':insert_cols',
f',\n{stmt_space*2}'.join(f'{t}.{col}' for col in insert_cols)
)
# WHEN NOT MATCHED THEN INSERT VALUES
merge_stmt = merge_stmt.replace(
':insert_values',
f',\n{stmt_space*2}'.join(f'{s}.{col}' for col in insert_cols)
)
# Optionally use WHEN NOT MATCHED BY SOURCE DELETE (Only Microsoft SQL Server)
return merge_stmt
[docs] def manage_foreign_keys(self, conn: Connection, action: str) -> None:
r"""Manage how the database handles foreign key constraints.
Should be implemented by DatabaseManagers whose SQL dialect
supports enabling/disabling checking foreign key constraints.
E.g. SQLite.
Parameters
----------
conn : sqlalchemy.engine.base.Connection
An open connection to the database.
action : str
How to handle foreign key constraints in the database.
Raises
------
pandemy.ExecuteStatementError
If changing the handling of foreign key constraint fails.
pandemy.InvalidInputError
If invalid input is supplied to `action`.
See Also
--------
* :meth:`~DatabaseManager.execute` : Execute a SQL statement.
Examples
--------
Enable and trigger a foreign key constraint using a SQLite in-memory database.
>>> import pandemy
>>> db = pandemy.SQLiteDb() # Create an in-memory database
>>> with db.engine.begin() as conn:
... db.execute(
... sql="CREATE TABLE Owner(OwnerId INTEGER PRIMARY KEY, OwnerName TEXT)",
... conn=conn
... )
... db.execute(
... sql=(
... "CREATE TABLE Store("
... "StoreId INTEGER PRIMARY KEY, "
... "StoreName TEXT, "
... "OwnerId INTEGER REFERENCES Owner(OwnerId)"
... ")"
... ),
... conn=conn
... )
... db.execute(
... sql="INSERT INTO Owner(OwnerId, OwnerName) VALUES(1, 'Shop keeper')",
... conn=conn
... )
... db.execute(
... sql=(
... "INSERT INTO Store(StoreId, StoreName, OwnerId) "
... "VALUES(1, 'Lumbridge General Supplies', 2)"
... ),
... conn=conn # OwnerId = 2 is not a valid FOREIGN KEY reference
... )
... db.manage_foreign_keys(conn=conn, action='ON')
... db.execute(
... sql=(
... "INSERT INTO Store(StoreId, StoreName, OwnerId) "
... "VALUES(1, 'Falador General Store', 3)"
... ),
... conn=conn # OwnerId = 3 is not a valid FOREIGN KEY reference
... ) # doctest: +IGNORE_EXCEPTION_DETAIL, +ELLIPSIS
Traceback (most recent call last):
...
pandemy.exceptions.ExecuteStatementError: IntegrityError: ('(sqlite3.IntegrityError) FOREIGN KEY constraint failed',)
"""
[docs] def execute(self, sql: Union[str, TextClause], conn: Connection, params: Union[dict, List[dict], None] = None):
r"""Execute a SQL statement.
Parameters
----------
sql : str or sqlalchemy.sql.elements.TextClause
The SQL statement to execute. A string value is automatically converted to a
:class:`sqlalchemy.sql.elements.TextClause` with the
:func:`sqlalchemy.sql.expression.text` function.
conn : sqlalchemy.engine.base.Connection
An open connection to the database.
params : dict or list of dict or None, default None
Parameters to bind to the SQL statement `sql`.
Parameters in the SQL statement should be prefixed by a colon (*:*) e.g. ``:myparameter``.
Parameters in `params` should *not* contain the colon (``{'myparameter': 'myvalue'}``).
Supply a list of parameter dictionaries to execute multiple
parametrized statements in the same method call, e.g.
``[{'parameter1': 'a string'}, {'parameter2': 100}]``.
This is useful for INSERT, UPDATE and DELETE statements.
Returns
-------
sqlalchemy.engine.CursorResult
A result object from the executed statement.
Raises
------
pandemy.ExecuteStatementError
If an error occurs when executing the statement.
pandemy.InvalidInputError
If `sql` is not of type str or :class:`sqlalchemy.sql.elements.TextClause`.
See Also
--------
* :meth:`sqlalchemy.engine.Connection.execute` : The method used for executing the SQL statement.
Examples
--------
To process the result from the method the database connection must remain open after the method
is executed i.e. the context manager *cannot* be closed before processing the result:
.. code-block:: python
import pandemy
db = SQLiteDb(file='Runescape.db')
with db.engine.connect() as conn:
result = db.execute('SELECT * FROM StoreSupply', conn=conn)
for row in result:
print(row) # process the result
...
"""
if isinstance(sql, str):
stmt = text(sql)
elif isinstance(sql, TextClause):
stmt = sql
else:
raise pandemy.InvalidInputError(f'Invalid type {type(sql)} for sql. '
'Expected str or sqlalchemy.sql.elements.TextClause.')
try:
if params is None:
return conn.execute(stmt)
else:
return conn.execute(stmt, params) # Parameters to bind to the sql statement
except Exception as e:
raise pandemy.ExecuteStatementError(f'{type(e).__name__}: {e.args}', data=e.args) from None
[docs] def delete_all_records_from_table(self, table: str, conn: Connection) -> None:
r"""Delete all records from the specified table.
Parameters
----------
table : str
The table to delete all records from.
conn : sqlalchemy.engine.base.Connection
An open connection to the database.
Raises
------
pandemy.DeleteFromTableError
If data cannot be deleted from the table.
pandemy.InvalidTableNameError
If the supplied table name is invalid.
See Also
--------
* :meth:`~DatabaseManager.load_table` : Load a SQL table into a :class:`pandas.DataFrame`.
* :meth:`~DatabaseManager.save_df` : Save a :class:`pandas.DataFrame` to a table in the database.
Examples
--------
Delete all records from a table in a SQLite in-memory database.
>>> import pandas as pd
>>> import pandemy
>>> df = pd.DataFrame(data=[
... [1, 'Lumbridge General Supplies', 'Lumbridge', 1],
... [2, 'Varrock General Store', 'Varrock', 2],
... [3, 'Falador General Store', 'Falador', 3]
... ],
... columns=['StoreId', 'StoreName', 'Location', 'OwnerId']
... )
>>> df = df.set_index('StoreId')
>>> df # doctest: +NORMALIZE_WHITESPACE
StoreName Location OwnerId
StoreId
1 Lumbridge General Supplies Lumbridge 1
2 Varrock General Store Varrock 2
3 Falador General Store Falador 3
>>> db = pandemy.SQLiteDb() # Create an in-memory database
>>> with db.engine.begin() as conn:
... db.save_df(df=df, table='Store', conn=conn)
... df_loaded = db.load_table(sql='Store', conn=conn, index_col='StoreId')
>>> df_loaded # doctest: +NORMALIZE_WHITESPACE
StoreName Location OwnerId
StoreId
1 Lumbridge General Supplies Lumbridge 1
2 Varrock General Store Varrock 2
3 Falador General Store Falador 3
>>> with db.engine.begin() as conn:
... db.delete_all_records_from_table(table='Store', conn=conn)
... df_loaded = db.load_table(sql='Store', conn=conn, index_col='StoreId')
>>> assert df_loaded.empty
>>> df_loaded # doctest: +NORMALIZE_WHITESPACE
Empty DataFrame
Columns: [StoreName, Location, OwnerId]
Index: []
"""
self._is_valid_table_name(table=table)
# SQL statement to delete all existing data from the table
sql = self._delete_from_table_stmt.replace(':table', table)
try:
conn.execute(sql)
except SQLAlchemyError as e:
raise pandemy.DeleteFromTableError(f'Could not delete records from table: {table}: {e.args[0]}',
data=e.args) from None
else:
logger.debug(f'Successfully deleted existing data from table {table}.')
[docs] def save_df(
self,
df: pd.DataFrame,
table: str,
conn: Connection,
if_exists: str = 'append',
index: bool = True,
index_label: Optional[Union[str, Sequence[str]]] = None,
chunksize: Optional[int] = None,
schema: Optional[str] = None,
dtype: Optional[Union[Dict[str, Union[str, object]], object]] = None,
datetime_cols_dtype: Optional[str] = None,
datetime_format: str = r'%Y-%m-%d %H:%M:%S',
localize_tz: Optional[str] = None,
target_tz: Optional[str] = None,
method: Optional[Union[str, Callable]] = None) -> None:
r"""Save the :class:`pandas.DataFrame` `df` to specified table in the database.
If the table does not exist it will be created. If the table already exists
the column names of the :class:`pandas.DataFrame` `df` must match the table column definitions.
Uses :meth:`pandas.DataFrame.to_sql` method to write the :class:`pandas.DataFrame` to the database.
Parameters
----------
df : pandas.DataFrame
The DataFrame to save to the database.
table : str
The name of the table where to save the :class:`pandas.DataFrame`.
conn : sqlalchemy.engine.base.Connection
An open connection to the database.
if_exists : str, {'append', 'replace', 'drop-replace', 'fail'}
How to update an existing table in the database:
* 'append': Append `df` to the existing table.
* 'replace': Delete all records from the table and then write `df` to the table.
* 'drop-replace': Drop the table, recreate it, and then write `df` to the table.
* 'fail': Raise :exc:`pandemy.TableExistsError` if the table exists.
.. versionadded:: 1.2.0
'drop-replace'
index : bool, default True
Write :class:`pandas.DataFrame` index as a column. Uses the name of the index as the
column name for the table.
index_label : str or sequence of str or None, default None
Column label for index column(s). If None is given (default) and `index` is True,
then the index names are used. A sequence should be given if the :class:`pandas.DataFrame`
uses a :class:`pandas.MultiIndex`.
chunksize : int or None, default None
The number of rows in each batch to be written at a time.
If None, all rows will be written at once.
schema : str, None, default None
Specify the schema (if database flavor supports this). If None, use default schema.
dtype : dict or scalar, default None
Specifying the data type for columns. If a dictionary is used, the keys should be the column names
and the values should be the SQLAlchemy types or strings for the sqlite3 legacy mode.
If a scalar is provided, it will be applied to all columns.
datetime_cols_dtype : {'str', 'int'} or None, default None
If the datetime columns of `df` should be converted to string or integer data types
before saving the table. If ``None`` no conversion of datetime columns is performed,
which is the default. When using ``'int'`` the datetime columns are converted to the
number of seconds since the Unix Epoch of 1970-01-01. The timezone of the datetime
columns should be in UTC when using ``'int'``.
.. versionadded:: 1.2.0
datetime_format : str, default r'%Y-%m-%d %H:%M:%S'
The datetime (:meth:`strftime <datetime.datetime.strftime>`) format
to use when converting datetime columns to strings.
.. versionadded:: 1.2.0
localize_tz : str or None, default None
The name of the timezone which to localize naive datetime columns into.
If None (the default) timezone localization is omitted.
.. versionadded:: 1.2.0
target_tz : str or None, default None
The name of the target timezone to convert timezone aware datetime columns, or columns that
have been localized by `localize_tz`, into. If None (the default) timezone conversion is omitted.
.. versionadded:: 1.2.0
method : None, 'multi', callable, default None
Controls the SQL insertion clause used:
* None:
Uses standard SQL INSERT clause (one per row).
* 'multi':
Pass multiple values in a single INSERT clause. It uses a special SQL syntax not supported by
all backends. This usually provides better performance for analytic databases like Presto and
Redshift, but has worse performance for traditional SQL backend if the table contains many
columns. For more information check the SQLAlchemy documentation.
* callable with signature (pd_table, conn, keys, data_iter):
This can be used to implement a more performant insertion method based on specific
backend dialect features.
See: `pandas SQL insertion method`_.
Raises
------
pandemy.DeleteFromTableError
If data in the table cannot be deleted when ``if_exists='replace'``.
pandemy.InvalidInputError
Invalid values or types for input parameters or if the timezone localization or conversion fails.
pandemy.InvalidTableNameError
If the supplied table name is invalid.
pandemy.SaveDataFrameError
If the :class:`pandas.DataFrame` cannot be saved to the table.
pandemy.TableExistsError
If the table exists and ``if_exists='fail'``.
See Also
--------
* :meth:`~DatabaseManager.load_table` : Load a SQL table into a :class:`pandas.DataFrame`.
* :meth:`~DatabaseManager.merge_df` : Merge data from a :class:`pandas.DataFrame` into a table.
* :meth:`~DatabaseManager.upsert_table` : Update a table with a :class:`pandas.DataFrame` and insert new rows if any.
* :meth:`pandas.DataFrame.to_sql` : Write records stored in a DataFrame to a SQL database.
* `pandas SQL insertion method`_ : Details about using the `method` parameter.
.. _pandas SQL insertion method: https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method
Examples
--------
Save a :class:`pandas.DataFrame` to a SQLite in-memory database.
>>> import pandas as pd
>>> import pandemy
>>> df = pd.DataFrame(data=[
... [1, 'Lumbridge General Supplies', 'Lumbridge', 1],
... [2, 'Varrock General Store', 'Varrock', 2],
... [3, 'Falador General Store', 'Falador', 3]
... ],
... columns=['StoreId', 'StoreName', 'Location', 'OwnerId']
... )
>>> df = df.set_index('StoreId')
>>> df # doctest: +NORMALIZE_WHITESPACE
StoreName Location OwnerId
StoreId
1 Lumbridge General Supplies Lumbridge 1
2 Varrock General Store Varrock 2
3 Falador General Store Falador 3
>>> db = pandemy.SQLiteDb() # Create an in-memory database
>>> with db.engine.begin() as conn:
... db.save_df(df=df, table='Store', conn=conn)
"""
# ==========================================
# Validate input
# ==========================================
self._validate_chunksize(chunksize=chunksize)
# Validate if_exists
if not isinstance(if_exists, str) or if_exists not in {'append', 'replace', 'drop-replace', 'fail'}:
raise pandemy.InvalidInputError(f'Invalid input if_exists = {if_exists}. '
"Expected 'append', 'replace', 'drop-replace' or 'fail'.")
# Validate table
if not isinstance(table, str):
raise pandemy.InvalidInputError(f'table must be a string. Got {type(table)}. table = {table}')
# ==========================================
# Process existing table
# ==========================================
if if_exists == 'replace':
# self._is_valid_table_name(table=table) is called within delete_all_records_from_table
self.delete_all_records_from_table(table=table, conn=conn)
if_exists = 'append'
elif if_exists == 'drop-replace':
self._is_valid_table_name(table=table)
if_exists = 'replace' # replace is the name for drop-replace in pandas.DataFrame.to_sql
else:
self._is_valid_table_name(table=table)
# ==========================================
# Convert datetime columns
# ==========================================
if datetime_cols_dtype is None and localize_tz is None and target_tz is None:
df_save = df
else:
df_save = pandemy._datetime.convert_datetime_columns(
df=df,
dtype=datetime_cols_dtype,
datetime_format=datetime_format,
localize_tz=localize_tz,
target_tz=target_tz
)
# ==========================================
# Write the DataFrame to the SQL table
# ==========================================
try:
df_save.to_sql(
table,
con=conn,
if_exists=if_exists,
index=index,
index_label=index_label,
chunksize=chunksize,
schema=schema,
dtype=dtype,
method=method
)
except ValueError as e:
if re.search(fr'.*{table}.+ already exists', e.args[0]):
raise pandemy.TableExistsError(
f'Table {table} already exists! and if_exists = {if_exists!r}', data=table
) from None
else:
raise pandemy.SaveDataFrameError(
f'Could not save DataFrame to table {table}: {e.args}', data=e.args
) from None
# Unexpected error
except Exception as e:
raise pandemy.SaveDataFrameError(
f'Could not save DataFrame to table {table}: {e.args}', data=e.args
) from None
else:
nr_cols = df.shape[1] + len(df.index.names) if index else df.shape[1]
nr_rows = df.shape[0]
logger.debug(f'Successfully wrote {nr_rows} rows over {nr_cols} columns to table {table}.')
[docs] def load_table(
self,
sql: Union[str, TextClause],
conn: Connection,
params: Optional[Dict[str, str]] = None,
index_col: Optional[Union[str, Sequence[str]]] = None,
columns: Optional[Sequence[str]] = None,
parse_dates: Optional[Union[list, dict]] = None,
datetime_cols_dtype: Optional[str] = None,
datetime_format: str = r'%Y-%m-%d %H:%M:%S',
localize_tz: Optional[str] = None,
target_tz: Optional[str] = None,
dtypes: Optional[Dict[str, Union[str, object]]] = None,
chunksize: Optional[int] = None,
coerce_float: bool = True
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
r"""Load a SQL table into a :class:`pandas.DataFrame`.
Specify a table name or a SQL query to load the :class:`pandas.DataFrame` from.
Uses :func:`pandas.read_sql` function to read from the database.
Parameters
----------
sql : str or sqlalchemy.sql.elements.TextClause
The table name or SQL query.
conn : sqlalchemy.engine.base.Connection
An open connection to the database to use for the query.
params : dict of str or None, default None
Parameters to bind to the SQL query `sql`.
Parameters in the SQL query should be prefixed by a colon (*:*) e.g. ``:myparameter``.
Parameters in `params` should *not* contain the colon (``{'myparameter': 'myvalue'}``).
index_col : str or sequence of str or None, default None
The column(s) to set as the index of the :class:`pandas.DataFrame`.
columns : list of str or None, default None
List of column names to select from the SQL table (only used when `sql` is a table name).
parse_dates : list or dict or None, default None
* List of column names to parse into datetime columns.
* Dict of `{column_name: format string}` where format string is strftime compatible
in case of parsing string times, or is one of (D, s, ns, ms, us)
in case of parsing integer timestamps.
* Dict of `{column_name: arg dict}`, where the arg dict corresponds to the keyword arguments of
:func:`pandas.to_datetime`. Especially useful with databases without native datetime support,
such as SQLite.
datetime_cols_dtype : {'str', 'int'} or None, default None
If the datetime columns of the loaded DataFrame `df` should be converted to string or integer
data types. If ``None`` conversion of datetime columns is omitted, which is the default.
When using ``'int'`` the datetime columns are converted to the number of seconds since the
Unix Epoch of 1970-01-01. The timezone of the datetime columns should be in UTC when using ``'int'``.
You may need to specify `parse_dates` in order for columns be converted into datetime columns
depending on the SQL driver.
.. versionadded:: 1.2.0
datetime_format : str, default r'%Y-%m-%d %H:%M:%S'
The datetime (:meth:`strftime <datetime.datetime.strftime>`) format
to use when converting datetime columns to strings.
.. versionadded:: 1.2.0
localize_tz : str or None, default None
The name of the timezone which to localize naive datetime columns into.
If None (the default) timezone localization is omitted.
target_tz : str or None, default None
The name of the target timezone to convert the datetime columns into after they have been localized.
If None (the default) timezone conversion is omitted.
dtypes : dict or None, default None
Desired data types for specified columns `{'column_name': data type}`.
Use pandas or numpy data types or string names of those.
If None no data type conversion is performed.
chunksize : int or None, default None
If `chunksize` is specified an iterator of DataFrames will be returned where `chunksize`
is the number of rows in each :class:`pandas.DataFrame`.
If `chunksize` is supplied timezone localization and conversion as well as dtype
conversion cannot be performed i.e. `localize_tz`, `target_tz` and `dtypes` have
no effect.
coerce_float : bool, default True
Attempts to convert values of non-string, non-numeric objects (like decimal.Decimal)
to floating point, useful for SQL result sets.
Returns
-------
df : pandas.DataFrame or Iterator[pandas.DataFrame]
:class:`pandas.DataFrame` with the result of the query or an iterator of DataFrames
if `chunksize` is specified.
Raises
------
pandemy.DataTypeConversionError
If errors when converting data types using the `dtypes` parameter.
pandemy.InvalidInputError
Invalid values or types for input parameters or if the timezone localization or conversion fails.
pandemy.LoadTableError
If errors when loading the table using :func:`pandas.read_sql`.
pandemy.SetIndexError
If setting the index of the returned :class:`pandas.DataFrame` fails when `index_col` is specified
and chunksize is None.
See Also
--------
* :meth:`~DatabaseManager.save_df` : Save a :class:`pandas.DataFrame` to a table in the database.
* :func:`pandas.read_sql` : Read SQL query or database table into a :class:`pandas.DataFrame`.
* :func:`pandas.to_datetime` : The function used for datetime conversion with `parse_dates`.
Examples
--------
When specifying the `chunksize` parameter the database connection must remain open
to be able to process the DataFrames from the iterator. The processing
*must* occur *within* the context manager:
.. code-block:: python
import pandemy
db = pandemy.SQLiteDb(file='Runescape.db')
with db.engine.connect() as conn:
df_gen = db.load_table(sql='ItemTradedInStore', conn=conn, chunksize=3)
for df in df_gen:
print(df) # Process your DataFrames
...
"""
# To get the correct index column(s) if chunksize is specified
# Not using index_col in pd.read_sql allows to set the data type of the index explicitly
index_col_pd_read_sql = None if chunksize is None else index_col
try:
df = pd.read_sql(sql, con=conn, params=params, parse_dates=parse_dates, index_col=index_col_pd_read_sql,
columns=columns, chunksize=chunksize, coerce_float=coerce_float)
except Exception as e:
raise pandemy.LoadTableError(f'{e.args}\nsql={sql}\nparams={params}\ncolumns={columns}',
data=(e.args, sql, params, columns)) from None
if chunksize:
return df
if len(df.index) == 0:
logger.warning('No rows were returned from the query.')
# Convert specified columns to desired data types
if dtypes is not None:
logger.debug('Convert columns to desired data types.')
error_msg: str = ''
data: Optional[tuple] = None
try:
df = df.astype(dtype=dtypes)
except KeyError:
# The keys that are not in the columns
difference = ', '.join([key for key in dtypes.keys() if key not in (cols := set(df.columns))])
cols = df.columns.tolist()
error_msg = (
f'Only column names can be used for the keys in dtypes parameter.'
f'\nColumns : {cols}\ndtypes : {dtypes}\n'
f'Difference: {difference}'
)
data=(cols, dtypes, difference)
except TypeError as e:
error_msg = f'Cannot convert data types: {e.args[0]}.\ndtypes={dtypes}'
data=(e.args, dtypes)
if error_msg:
raise pandemy.DataTypeConversionError(message=error_msg, data=data)
# Localize (and convert) to desired timezone
if datetime_cols_dtype is None and localize_tz is None and target_tz is None:
pass
else:
df = pandemy._datetime.convert_datetime_columns(
df=df,
dtype=datetime_cols_dtype,
datetime_format=datetime_format,
localize_tz=localize_tz,
target_tz=target_tz
)
# Nr of rows and columns retrieved by the query
nr_rows = df.shape[0]
nr_cols = df.shape[1]
# Set the index/indices column(s)
if index_col is not None:
error_msg = ''
try:
df.set_index(index_col, inplace=True)
except KeyError as e:
error_msg = f'Cannot set index to {index_col}: {e.args[0]}.'
except TypeError as e:
error_msg = f'Cannot set index to {index_col}: {e.args[0].replace("""keys""", "index_col")}.'
if error_msg:
raise pandemy.SetIndexError(message=error_msg, data=index_col)
logger.debug(f'Successfully loaded {nr_rows} rows and {nr_cols} columns.')
return df
[docs] def upsert_table(
self,
df: pd.DataFrame,
table: str,
conn: Connection,
where_cols: Sequence[str],
upsert_cols: Optional[Union[str, Sequence[str]]] = 'all',
upsert_index_cols: Union[bool, Sequence[str]] = False,
update_only: bool = False,
chunksize: Optional[int] = None,
nan_to_none: bool = True,
datetime_cols_dtype: Optional[str] = None,
datetime_format: str = r'%Y-%m-%d %H:%M:%S',
localize_tz: Optional[str] = None,
target_tz: Optional[str] = None,
dry_run: bool = False
) -> Union[Tuple[CursorResult, Optional[CursorResult]], Tuple[str, Optional[str]], Tuple[None, None]]:
r"""Update a table with data from a :class:`pandas.DataFrame` and insert new rows if any.
This method executes an UPDATE statement followed by an INSERT statement (UPSERT)
to update the rows of a table with a :class:`pandas.DataFrame` and insert new rows.
The INSERT statement can be omitted with the `update_only` parameter.
The column names of `df` and `table` must match.
.. versionadded:: 1.2.0
Parameters
----------
df : pandas.DataFrame
The DataFrame with data to upsert.
table : str
The name of the table to upsert.
conn : sqlalchemy.engine.base.Connection
An open connection to the database.
where_cols : Sequence[str]
The columns from `df` to use in the WHERE clause to identify the rows to upsert.
upsert_cols : str or Sequence[str] or None, default 'all'
The columns from `table` to upsert with data from `df`.
The default string ``'all'`` will upsert all columns.
If ``None`` no columns will be selected for upsert. This is useful if only columns
of the index of `df` should be upserted by specifying `upsert_index_cols`.
upsert_index_cols : bool or Sequence[str], default False
If the index columns of `df` should be included in the columns to upsert.
``True`` indicates that the index should be included. If the index is a :class:`pandas.MultiIndex`
a sequence of strings that maps against the levels to include can be used to only include the desired levels.
``False`` excludes the index column(s) from being upserted which is the default.
update_only : bool, default False
If ``True`` the `table` should only be updated and new rows not inserted.
If ``False`` (the default) perform an update and insert new rows.
chunksize : int or None, default None
Divide `df` into chunks and perform the upsert in chunks of `chunksize` rows.
If None all rows of `df` are processed in one chunk, which is the default.
If `df` has many rows dividing it into chunks may increase performance.
nan_to_none : bool, default True
If columns with missing values (NaN values) that are of type :attr:`pandas.NA` :attr:`pandas.NaT`
or :attr:`numpy.nan` should be converted to standard Python ``None``. Some databases do not support
these types in parametrized SQL statements.
datetime_cols_dtype : {'str', 'int'} or None, default None
If the datetime columns of `df` should be converted to string or integer data types
before upserting the table. SQLite cannot handle datetime objects as parameters
and should use this option. If ``None`` conversion of datetime columns is omitted,
which is the default. When using ``'int'`` the datetime columns are converted to the
number of seconds since the Unix Epoch of 1970-01-01. The timezone of the datetime
columns should be in UTC when using ``'int'``.
datetime_format : str, default r'%Y-%m-%d %H:%M:%S'
The datetime (:meth:`strftime <datetime.datetime.strftime>`) format
to use when converting datetime columns to strings.
localize_tz : str or None, default None
The name of the timezone which to localize naive datetime columns into.
If None (the default) timezone localization is omitted.
target_tz : str or None, default None
The name of the target timezone to convert timezone aware datetime columns, or columns that have been
localized by `localize_tz`, into. If None (the default) timezone conversion is omitted.
dry_run : bool, default False
Do not execute the upsert. Instead return the SQL statements that would have been executed on the database.
The return value is a tuple ('UPDATE statement', 'INSERT statement'). If `update_only` is ``True`` the
INSERT statement will be ``None``.
Returns
-------
Tuple[sqlalchemy.engine.CursorResult, Optional[sqlalchemy.engine.CursorResult]] or Tuple[str, Optional[str]] or Tuple[None, None]
Result objects from the executed statements or the SQL statements that would have been executed
if `dry_run` is ``True``. The result objects will be ``None`` if `df` is empty.
Raises
------
pandemy.ExecuteStatementError
If an error occurs when executing the UPDATE and or INSERT statement.
pandemy.InvalidColumnNameError
If a column name of `upsert_cols` or `upsert_index_cols` are not
among the columns or index of `df`.
pandemy.InvalidInputError
Invalid values or types for input parameters or if the timezone localization or conversion fails.
pandemy.InvalidTableNameError
If the supplied table name is invalid.
See Also
--------
:meth:`~DatabaseManager.execute()` : Execute a SQL statement.
:meth:`~DatabaseManager.load_table()` : Load a table into a :class:`pandas.DataFrame`.
:meth:`~DatabaseManager.merge_df()` : Merge data from a :class:`pandas.DataFrame` into a table.
:meth:`~DatabaseManager.save_df()` : Save a :class:`pandas.DataFrame` to a table in the database.
Examples
--------
Create a simple table called *Customer* and insert some data from a :class:`pandas.DataFrame` (``df``).
Change the first row and add a new row to ``df``. Finally upsert the table with ``df``.
>>> import pandas as pd
>>> import pandemy
>>> df = pd.DataFrame(data={
... 'CustomerId': [1, 2],
... 'CustomerName': ['Zezima', 'Dr Harlow']
... }
... )
>>> df = df.set_index('CustomerId')
>>> df # doctest: +NORMALIZE_WHITESPACE, +ELLIPSIS
CustomerName
CustomerId
1 Zezima
2 Dr Harlow
>>> db = pandemy.SQLiteDb() # Create an in-memory database
>>> with db.engine.begin() as conn:
... _ = db.execute(
... sql=(
... 'CREATE TABLE Customer('
... 'CustomerId INTEGER PRIMARY KEY, '
... 'CustomerName TEXT NOT NULL)'
... ),
... conn=conn
... )
... db.save_df(df=df, table='Customer', conn=conn)
>>> df.loc[1, 'CustomerName'] = 'Baraek' # Change data
>>> df.loc[3, 'CustomerName'] = 'Mosol Rei' # Add new data
>>> df # doctest: +NORMALIZE_WHITESPACE
CustomerName
CustomerId
1 Baraek
2 Dr Harlow
3 Mosol Rei
>>> with db.engine.begin() as conn:
... _, _ = db.upsert_table(
... df=df,
... table='Customer',
... conn=conn,
... where_cols=['CustomerId'],
... upsert_index_cols=True
... )
... df_upserted = db.load_table(
... sql='SELECT * FROM Customer ORDER BY CustomerId ASC',
... conn=conn,
... index_col='CustomerId'
... )
>>> df_upserted # doctest: +NORMALIZE_WHITESPACE
CustomerName
CustomerId
1 Baraek
2 Dr Harlow
3 Mosol Rei
"""
self._is_valid_table_name(table=table)
self._validate_chunksize(chunksize=chunksize)
df_upsert, update_cols, insert_cols = self._prepare_input_data_for_modify_statements(
df=df,
update_cols=upsert_cols,
update_index_cols=upsert_index_cols,
where_cols=where_cols
)
# Create the UPDATE statement
update_stmt = self._create_update_statement(
table=table,
update_cols=update_cols,
where_cols=where_cols,
space_factor=1
)
# Create the INSERT statement
if not update_only:
insert_stmt = self._create_insert_into_where_not_exists_statement(
table=table,
insert_cols=insert_cols,
where_cols=where_cols,
select_values_space_factor=2,
where_cols_space_factor=4
)
else:
insert_stmt = None
if dry_run: # Early exit to check the statements
return update_stmt, insert_stmt
if datetime_cols_dtype is None and localize_tz is None and target_tz is None:
pass
else:
df_upsert = pandemy._datetime.convert_datetime_columns(
df=df_upsert,
dtype=datetime_cols_dtype,
datetime_format=datetime_format,
localize_tz=localize_tz,
target_tz=target_tz
)
# Convert missing values
if nan_to_none and df_upsert.isna().any().any(): # If at least 1 missing value
df_upsert = pandemy._dataframe.convert_nan_to_none(df=df_upsert)
# Turn the DataFrame into an iterator yielding a list of dicts [{parameter: value}, {...}]
params_iter = pandemy._dataframe.df_to_parameters_in_chunks(df=df_upsert, chunksize=chunksize)
# Perform the UPDATE and optionally INSERT
result_update, result_insert = None, None # In case of df_upsert being empty
for chunk, params in enumerate(params_iter, start=1):
logger.debug(f'UPSERT {len(params)} rows, chunk {chunk}')
try:
result_update = conn.execute(text(update_stmt), params)
result_insert = conn.execute(text(insert_stmt), params) if not update_only else None
except Exception as e:
log_level = 40 # ERROR
raise pandemy.ExecuteStatementError(f'{type(e).__name__}: {e.args}', data=e.args) from None
else:
log_level = 10 # DEBUG
finally:
logger.log(log_level, f'UPDATE statement for table {table}:\n{update_stmt}\n')
logger.log(log_level, f'update_only={update_only}')
logger.log(log_level, f'INSERT statement for table {table}:\n{insert_stmt}')
logger.log(
log_level,
'params:\n{params_row}'.format(params_row="\n".join(str(row) for row in params))
)
return result_update, result_insert
[docs] def merge_df(
self,
df: pd.DataFrame,
table: str,
conn: Connection,
on_cols: Sequence[str],
merge_cols: Optional[Union[str, Sequence[str]]] = 'all',
merge_index_cols: Union[bool, Sequence[str]] = False,
omit_update_where_clause: bool = True,
chunksize: Optional[int] = None,
nan_to_none: bool = True,
datetime_cols_dtype: Optional[str] = None,
datetime_format: str = r'%Y-%m-%d %H:%M:%S',
localize_tz: Optional[str] = None,
target_tz: Optional[str] = None,
dry_run: bool = False) -> Union[CursorResult, str, None]:
r"""Merge data from a :class:`pandas.DataFrame` into a table.
This method executes a combined UPDATE and INSERT statement on a table using the
MERGE statement. The method is similar to :meth:`~DatabaseManager.upsert_table()`
but it only executes one statement instead of two.
Databases implemented in Pandemy that support the MERGE statement:
- Oracle
The column names of `df` and `table` must match.
.. versionadded:: 1.2.0
Parameters
----------
df : pandas.DataFrame
The DataFrame with data to merge into `table`.
table : str
The name of the table to merge data into.
conn : sqlalchemy.engine.base.Connection
An open connection to the database.
on_cols : Sequence[str] or None
The columns from `df` to use in the ON clause to identify how to merge rows into `table`.
merge_cols : str or Sequence[str] or None, default 'all'
The columns of `table` to merge (update or insert) with data from `df`.
The default string ``'all'`` will update all columns.
If ``None`` no columns will be selected for merge. This is useful if only columns
of the index of `df` should be updated by specifying `merge_index_cols`.
merge_index_cols : bool or Sequence[str], default False
If the index columns of `df` should be included in the columns to merge.
``True`` indicates that the index should be included. If the index is a :class:`pandas.MultiIndex`
a sequence of strings that maps against the levels to include can be used to only include the desired levels.
``False`` excludes the index column(s) from being updated which is the default.
omit_update_where_clause : bool, default True
If the WHERE clause of the UPDATE clause should be omitted from the MERGE statement.
The WHERE clause is implemented as OR conditions where the target and source columns to update
are not equal.
Databases in Pandemy that support this option are: Oracle
Example of the SQL generated when ``omit_update_where_clause=True``:
.. code-block::
[...]
WHEN MATCHED THEN
UPDATE
SET
t.IsAdventurer = s.IsAdventurer,
t.CustomerId = s.CustomerId,
t.CustomerName = s.CustomerName
WHERE
t.IsAdventurer <> s.IsAdventurer OR
t.CustomerId <> s.CustomerId OR
t.CustomerName <> s.CustomerName
[...]
chunksize : int or None, default None
Divide `df` into chunks and perform the merge in chunks of `chunksize` rows.
If None all rows of `df` are processed in one chunk, which is the default.
If `df` has many rows dividing it into chunks may increase performance.
nan_to_none : bool, default True
If columns with missing values (NaN values) that are of type :attr:`pandas.NA` :attr:`pandas.NaT`
or :attr:`numpy.nan` should be converted to standard Python ``None``. Some databases do not support
these types in parametrized SQL statements.
datetime_cols_dtype : {'str', 'int'} or None, default None
If the datetime columns of `df` should be converted to string or integer data types
before updating the table. If ``None`` conversion of datetime columns is omitted,
which is the default. When using ``'int'`` the datetime columns are converted to the
number of seconds since the Unix Epoch of 1970-01-01. The timezone of the datetime
columns should be in UTC when using ``'int'``.
datetime_format : str, default r'%Y-%m-%d %H:%M:%S'
The datetime (:meth:`strftime <datetime.datetime.strftime>`) format
to use when converting datetime columns to strings.
localize_tz : str or None, default None
The name of the timezone which to localize naive datetime columns into.
If None (the default) timezone localization is omitted.
target_tz : str or None, default None
The name of the target timezone to convert timezone aware datetime columns, or columns that have been
localized by `localize_tz`, into. If None (the default) timezone conversion is omitted.
dry_run : bool, default False
Do not execute the merge. Instead return the SQL statement
that would have been executed on the database as a string.
Returns
-------
sqlalchemy.engine.CursorResult or str or None
A result object from the executed statement or the SQL statement
that would have been executed if `dry_run` is ``True``.
The result object will be ``None`` if `df` is empty.
Raises
------
pandemy.ExecuteStatementError
If an error occurs when executing the MERGE statement.
pandemy.InvalidColumnNameError
If a column name of `merge_cols`, `merge_index_cols` or `on_cols` are not
among the columns or index of the input DataFrame `df`.
pandemy.InvalidInputError
Invalid values or types for input parameters or if the timezone localization or conversion fails.
pandemy.InvalidTableNameError
If the supplied table name is invalid.
pandemy.SQLStatementNotSupportedError
If the database dialect does not support the MERGE statement.
See Also
--------
* :meth:`~DatabaseManager.save_df()` : Save a :class:`pandas.DataFrame` to specified table in the database.
* :meth:`~DatabaseManager.upsert_table()` : Update a table with a :class:`pandas.DataFrame` and optionally insert new rows.
Examples
--------
Create a MERGE statement from an empty :class:`pandas.DataFrame` that represents a table in a database by
using the parameter ``dry_run=True``. The :meth:`begin() <sqlalchemy.engine.Engine.begin>` method of the
:class:`DatabaseManager.engine <sqlalchemy.engine.Engine>` is mocked to avoid having to connect to a real database.
>>> import pandas as pd
>>> import pandemy
>>> from unittest.mock import MagicMock
>>> df = pd.DataFrame(columns=['ItemId', 'ItemName', 'MemberOnly', 'IsAdventurer'])
>>> df = df.set_index('ItemId')
>>> df # doctest: +NORMALIZE_WHITESPACE, +ELLIPSIS
Empty DataFrame
Columns: [ItemName, MemberOnly, IsAdventurer]
Index: []
>>> db = pandemy.OracleDb(
... username='Fred_the_Farmer',
... password='Penguins-sheep-are-not',
... host='fred.farmer.rs',
... port=1234,
... service_name='woollysheep'
... )
>>> db.engine.begin = MagicMock() # Mock the begin method
>>> with db.engine.begin() as conn:
... merge_stmt = db.merge_df(
... df=df,
... table='Item',
... conn=conn,
... on_cols=['ItemName'],
... merge_cols='all',
... merge_index_cols=False,
... dry_run=True
... )
>>> print(merge_stmt) # doctest: +NORMALIZE_WHITESPACE
MERGE INTO Item t
<BLANKLINE>
USING (
SELECT
:ItemName AS ItemName,
:MemberOnly AS MemberOnly,
:IsAdventurer AS IsAdventurer
FROM DUAL
) s
<BLANKLINE>
ON (
t.ItemName = s.ItemName
)
<BLANKLINE>
WHEN MATCHED THEN
UPDATE
SET
t.MemberOnly = s.MemberOnly,
t.IsAdventurer = s.IsAdventurer
<BLANKLINE>
WHEN NOT MATCHED THEN
INSERT (
t.ItemName,
t.MemberOnly,
t.IsAdventurer
)
VALUES (
s.ItemName,
s.MemberOnly,
s.IsAdventurer
)
"""
self._supports_merge_statement()
self._is_valid_table_name(table=table)
self._validate_chunksize(chunksize=chunksize)
df_merge, update_cols, insert_cols = self._prepare_input_data_for_modify_statements(
df=df,
update_cols=merge_cols,
update_index_cols=merge_index_cols,
where_cols=on_cols
)
# Create the MERGE statement
merge_stmt = self._create_merge_statement(
table=table,
insert_cols=insert_cols,
on_cols=on_cols,
update_cols=update_cols,
omit_update_where_clause=omit_update_where_clause
)
if dry_run: # Early exit to check the statement
return merge_stmt
if datetime_cols_dtype is None and localize_tz is None and target_tz is None:
pass
else:
df_merge = pandemy._datetime.convert_datetime_columns(
df=df_merge,
dtype=datetime_cols_dtype,
datetime_format=datetime_format,
localize_tz=localize_tz,
target_tz=target_tz
)
# Convert missing values
if nan_to_none and df_merge.isna().any().any(): # If at least 1 missing value
df_merge = pandemy._dataframe.convert_nan_to_none(df=df_merge)
# Turn the DataFrame into an iterator yielding a list of dicts [{parameter: value}, {...}]
params_iter = pandemy._dataframe.df_to_parameters_in_chunks(df=df_merge, chunksize=chunksize)
# Perform the MERGE
result_merge = None # In case of df_merge being empty
for chunk, params in enumerate(params_iter, start=1):
logger.debug(f'MERGE {len(params)} rows, chunk {chunk}')
try:
result_merge = conn.execute(text(merge_stmt), params)
except Exception as e:
log_level = 40 # ERROR
raise pandemy.ExecuteStatementError(f'{type(e).__name__}: {e.args}', data=e.args) from None
else:
log_level = 10 # DEBUG
finally:
logger.log(log_level, f'MERGE statement for table {table}:\n{merge_stmt}\n')
logger.log(
log_level,
'params:\n{params_row}'.format(params_row="\n".join(str(row) for row in params))
)
return result_merge
# ===============================================================
# SQLite
# ===============================================================
[docs]class SQLiteDb(DatabaseManager):
r"""A SQLite :class:`DatabaseManager`.
Parameters
----------
file : str or pathlib.Path, default ':memory:'
The path (absolute or relative) to the SQLite database file.
The default creates an in-memory database.
must_exist : bool, default False
If ``True`` validate that `file` exists unless ``file=':memory:'``.
If it does not exist :exc:`pandemy.DatabaseFileNotFoundError`
is raised. If ``False`` the validation is omitted.
container : SQLContainer or None, default None
A container of database statements that the SQLite :class:`DatabaseManager` can use.
driver : str, default 'sqlite3'
The database driver to use. The default is the Python built-in module :mod:`sqlite3`,
which is also the default driver of SQLAlchemy.
When the default is used no driver name is displayed in the connection URL.
.. versionadded:: 1.2.0
url : :class:`str` or :class:`sqlalchemy.engine.URL` or None, default None
A SQLAlchemy connection URL to use for creating the database engine.
It overrides the value of `file` and `must_exist`.
.. versionadded:: 1.2.0
connect_args : dict or None, default None
Additional arguments sent to the driver upon connection that further
customizes the connection.
.. versionadded:: 1.2.0
engine_config : dict or None, default None
Additional keyword arguments passed to the :func:`sqlalchemy.create_engine` function.
engine : :class:`sqlalchemy.engine.Engine` or None, default None
A SQLAlchemy Engine to use as the database engine of :class:`SQLiteDb`.
It overrides the value of `file` and `must_exist`. If specified the value
of `url` should be ``None``. If ``None`` (the default) the engine will be
created from `file` or `url`.
.. versionadded:: 1.2.0
**kwargs : dict
Additional keyword arguments that are not used by :class:`SQLiteDb`.
Raises
------
pandemy.CreateConnectionURLError
If there are errors with `url`.
pandemy.CreateEngineError
If the creation of the database engine fails.
pandemy.DatabaseFileNotFoundError
If the database `file` does not exist when ``must_exist=True``.
pandemy.InvalidInputError
If the parameters are specified with invalid input.
See Also
--------
* :class:`pandemy.DatabaseManager` : The parent class.
* :func:`sqlalchemy.create_engine` : The function used to create the database engine.
* `SQLite dialect and drivers`_ : The SQLite dialect and drivers in SQLAlchemy.
* `SQLite <https://sqlite.org/index.html>`_ : The SQLite homepage.
.. _SQLite dialect and drivers: https://docs.sqlalchemy.org/en/14/dialects/sqlite.html
"""
__slots__ = ('file', 'must_exist', 'driver')
def __init__(
self,
file: Union[str, Path] = ':memory:',
must_exist: bool = False,
container: Optional[pandemy.SQLContainer] = None,
driver: str = 'sqlite3',
url: Optional[Union[str, URL]] = None,
connect_args: Optional[Dict[str, Any]] = None,
engine_config: Optional[Dict[str, Any]] = None,
engine: Optional[Engine] = None,
**kwargs
) -> None:
# file
if isinstance(file, Path) or file == ':memory:':
self.file = file
elif isinstance(file, str):
self.file = Path(file)
else:
raise pandemy.InvalidInputError(
f'file must be a string or pathlib.Path. Received: {file}, {type(file)}'
)
# must_exist
if isinstance(must_exist, bool):
self.must_exist = must_exist
else:
raise pandemy.InvalidInputError(
f'must_exist must be a boolean. Received: {must_exist}, {type(must_exist)}.'
)
if file != ':memory:' and url is None and engine is None:
if not self.file.exists() and must_exist:
raise pandemy.DatabaseFileNotFoundError(
f'file={self.file!r} does not exist and and must_exist={must_exist!r}. '
'Cannot instantiate the SQLite DatabaseManager.'
)
self.driver = driver
# Build the connection URL
if url is None and engine is None:
try:
url = URL.create(
drivername='sqlite' if driver == 'sqlite3' else f'sqlite+{driver}',
database=str(self.file)
)
except Exception as e:
raise pandemy.CreateConnectionURLError(message=f'{type(e).__name__}: {e.args}', data=e.args) from None
elif url is not None and engine is None:
url = make_url(url)
self.file = url.database
self.driver = 'sqlite3' if len(dvr := url.drivername.split('+')) == 1 else dvr[1] # drivername = backend+driver
super().__init__(
url=url,
container=container,
connect_args=connect_args,
engine_config=engine_config,
engine=engine
)
def __str__(self):
r"""String representation of the object."""
return f"SQLiteDb(file='{self.file}', must_exist={self.must_exist})"
@property
def conn_str(self) -> str:
r"""Backwards compatibility for the `conn_str` attribute.
The `conn_str` attribute is deprecated in version 1.2.0 and replaced by the url attribute.
"""
if pandemy.__versiontuple__[:3] >= (1, 2, 0):
warnings.warn(
message='The conn_str attribute is deprecated in version 1.2.0 and replaced by url. Use SQLiteDb.url instead.',
category=DeprecationWarning,
stacklevel=2
)
return str(self.url)
[docs] def manage_foreign_keys(self, conn: Connection, action: str = 'ON') -> None:
r"""Manage how the database handles foreign key constraints.
In SQLite the check of foreign key constraints is not enabled by default.
Parameters
----------
conn : sqlalchemy.engine.base.Connection
An open connection to the database.
action : {'ON', 'OFF'}
Enable ('ON') or disable ('OFF') the check of foreign key constraints.
Raises
------
pandemy.ExecuteStatementError
If the enabling/disabling of the foreign key constraints fails.
pandemy.InvalidInputError
If invalid input is supplied to `action`.
See Also
--------
* :meth:`~DatabaseManager.execute` : Execute a SQL statement.
Examples
--------
Enable and trigger a foreign key constraint using an in-memory database.
>>> import pandemy
>>> db = pandemy.SQLiteDb() # Create an in-memory database
>>> with db.engine.begin() as conn:
... db.execute(
... sql="CREATE TABLE Owner(OwnerId INTEGER PRIMARY KEY, OwnerName TEXT)",
... conn=conn
... )
... db.execute(
... sql=(
... "CREATE TABLE Store("
... "StoreId INTEGER PRIMARY KEY, "
... "StoreName TEXT, "
... "OwnerId INTEGER REFERENCES Owner(OwnerId)"
... ")"
... ),
... conn=conn
... )
... db.execute(
... sql="INSERT INTO Owner(OwnerId, OwnerName) VALUES(1, 'Shop keeper')",
... conn=conn
... )
... db.execute(
... sql=(
... "INSERT INTO Store(StoreId, StoreName, OwnerId) "
... "VALUES(1, 'Lumbridge General Supplies', 2)"
... ),
... conn=conn # OwnerId = 2 is not a valid FOREIGN KEY reference
... )
... db.manage_foreign_keys(conn=conn, action='ON')
... db.execute(
... sql=(
... "INSERT INTO Store(StoreId, StoreName, OwnerId) "
... "VALUES(1, 'Falador General Store', 3)"
... ),
... conn=conn # OwnerId = 3 is not a valid FOREIGN KEY reference
... ) # doctest: +IGNORE_EXCEPTION_DETAIL, +ELLIPSIS
Traceback (most recent call last):
...
pandemy.exceptions.ExecuteStatementError: IntegrityError: ('(sqlite3.IntegrityError) FOREIGN KEY constraint failed',)
"""
actions = {'ON', 'OFF'}
if not isinstance(action, str):
error = True
elif action not in actions:
error = True
else:
error = False
if error:
raise pandemy.InvalidInputError(f'Invalid input action = {action}. Allowed values: {actions}',
data=(action, actions))
try:
conn.execute(f'PRAGMA foreign_keys = {action};')
except Exception as e:
raise pandemy.ExecuteStatementError(f'{type(e).__name__}: {e.args}', data=e.args) from None
[docs]class OracleDb(DatabaseManager):
r"""An Oracle :class:`DatabaseManager`.
Requires the `cx_Oracle`_ package to be able
to create a connection to the database.
To use a DSN connection string specified in the Oracle connection config file, *tnsnames.ora*,
set `host` to the desired network service name in *tnsnames.ora* and leave
`port`, `service_name` and `sid` as ``None``. Using a *tnsnames.ora* file is needed
to connect to `Oracle Cloud Autononmous Databases`_.
.. versionadded:: 1.1.0
.. _cx_Oracle: https://oracle.github.io/python-cx_Oracle/
Parameters
----------
username : str or None, default None
The username of the database account.
Must be specified if `url` or `engine` are ``None``.
password : str or None, default None
The password of the database account.
Must be specified if `url` or `engine` are ``None``.
host : str or None, default None
The host name or server IP-address where the database is located.
Must be specified if `url` or `engine` are ``None``.
port : int or str or None, default None
The port the `host` is listening on.
The default port of Oracle databases is 1521.
service_name : str or None, default None
The name of the service used for the database connection.
sid : str or None, default None
The SID used for the connection. SID is the name of the database instance on the `host`.
Note that `sid` and `service_name` should not be specified at the same time.
container : SQLContainer or None, default None
A container of database statements that :class:`OracleDb` can use.
driver : str, default 'cx_oracle'
The database driver to use.
.. versionadded:: 1.2.0
url : :class:`str` or :class:`sqlalchemy.engine.URL` or None, default None
A SQLAlchemy connection URL to use for creating the database engine.
Specifying `url` overrides the parameters: `username`, `password`, `host`
`port`, `service_name`, `sid` and `driver`. If `url` is specified
`engine` should be ``None``.
connect_args : dict or None, default None
Additional arguments sent to the driver upon connection that further
customizes the connection.
engine_config : dict or None, default None
Additional keyword arguments passed to the :func:`sqlalchemy.create_engine` function.
engine : :class:`sqlalchemy.engine.Engine` or None, default None
A SQLAlchemy Engine to use as the database engine of :class:`OracleDb`.
If ``None`` (the default) the engine will be created from the other parameters.
When specified it overrides the parameters: `username`, `password`, `host`, `port`,
`service_name`, `sid` and `driver`. If specified `url` should be ``None``.
**kwargs : dict
Additional keyword arguments that are not used by :class:`OracleDb`.
Raises
------
pandemy.CreateConnectionURLError
If the creation of the connection URL fails.
pandemy.CreateEngineError
If the creation of the database engine fails.
pandemy.InvalidInputError
If invalid combinations of the parameters are used.
See Also
--------
* :class:`pandemy.DatabaseManager` : The parent class.
* :func:`sqlalchemy.create_engine` : The function used to create the database engine.
* `The cx_Oracle database driver`_ : Details of the cx_Oracle driver and its usage in SQLAlchemy.
* `cx_Oracle documentation <https://cx-oracle.readthedocs.io/en/latest/>`_
* `Specifying connect_args`_ : Details about the `connect_args` parameter.
* `tnsnames.ora`_ : Oracle connection config file.
* `Oracle Cloud Autononmous Databases`_
.. _The cx_Oracle database driver: https://docs.sqlalchemy.org/en/14/dialects/oracle.html#module-sqlalchemy.dialects.oracle.cx_oracle
.. _Specifying connect_args: https://docs.sqlalchemy.org/en/14/core/engines.html#custom-dbapi-args
.. _tnsnames.ora: https://docs.oracle.com/database/121/NETRF/tnsnames.htm#NETRF259
.. _Oracle Cloud Autononmous Databases : https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#connecting-to-oracle-cloud-autonomous-databases
Examples
--------
Create an instance of :class:`OracleDb` and connect using a service:
>>> db = pandemy.OracleDb(
... username='Fred_the_Farmer',
... password='Penguins-sheep-are-not',
... host='fred.farmer.rs',
... port=1234,
... service_name='woollysheep'
... )
>>> str(db)
'OracleDb(oracle+cx_oracle://Fred_the_Farmer:***@fred.farmer.rs:1234?service_name=woollysheep)'
>>> db.username
'Fred_the_Farmer'
>>> db.password
'Penguins-sheep-are-not'
>>> db.host
'fred.farmer.rs'
>>> db.port
1234
>>> db.service_name
'woollysheep'
>>> db.url
oracle+cx_oracle://Fred_the_Farmer:***@fred.farmer.rs:1234?service_name=woollysheep
>>> db.engine
Engine(oracle+cx_oracle://Fred_the_Farmer:***@fred.farmer.rs:1234?service_name=woollysheep)
Connect with a DSN connection string using a net service name from a *tnsnames.ora* config file
and supply additional connection arguments and engine configuration:
>>> import cx_Oracle
>>> connect_args = {
... 'encoding': 'UTF-8',
... 'nencoding': 'UTF-8',
... 'mode': cx_Oracle.SYSDBA,
... 'events': True
... }
>>> engine_config = {
... 'coerce_to_unicode': False,
... 'arraysize': 40,
... 'auto_convert_lobs': False
... }
>>> db = pandemy.OracleDb(
... username='Fred_the_Farmer',
... password='Penguins-sheep-are-not',
... host='my_dsn_name',
... connect_args=connect_args,
... engine_config=engine_config
... )
>>> db
OracleDb(
username='Fred_the_Farmer',
password='***',
host='my_dsn_name',
port=None,
service_name=None,
sid=None,
driver='cx_oracle',
url=oracle+cx_oracle://Fred_the_Farmer:***@my_dsn_name,
container=None,
connect_args={'encoding': 'UTF-8', 'nencoding': 'UTF-8', 'mode': 2, 'events': True},
engine_config={'coerce_to_unicode': False, 'arraysize': 40, 'auto_convert_lobs': False},
engine=Engine(oracle+cx_oracle://Fred_the_Farmer:***@my_dsn_name)
)
If you are familiar with the connection URL syntax of SQLAlchemy you can create an instance of
:class:`OracleDb` directly from a URL:
>>> url = 'oracle+cx_oracle://Fred_the_Farmer:Penguins-sheep-are-not@my_dsn_name'
>>> db = pandemy.OracleDb(url=url)
>>> db
OracleDb(
username='Fred_the_Farmer',
password='***',
host='my_dsn_name',
port=None,
service_name=None,
sid=None,
driver='cx_oracle',
url=oracle+cx_oracle://Fred_the_Farmer:***@my_dsn_name,
container=None,
connect_args={},
engine_config={},
engine=Engine(oracle+cx_oracle://Fred_the_Farmer:***@my_dsn_name)
)
If you already have a database :class:`engine <sqlalchemy.engine.Engine>` and would like to use it
with :class:`OracleDb` simply create the instance like this:
>>> from sqlalchemy import create_engine
>>> url = 'oracle+cx_oracle://Fred_the_Farmer:Penguins-sheep-are-not@fred.farmer.rs:1234/shears'
>>> engine = create_engine(url, coerce_to_unicode=False)
>>> db = pandemy.OracleDb(engine=engine)
>>> db
OracleDb(
username='Fred_the_Farmer',
password='***',
host='fred.farmer.rs',
port=1234,
service_name=None,
sid='shears',
driver='cx_oracle',
url=oracle+cx_oracle://Fred_the_Farmer:***@fred.farmer.rs:1234/shears,
container=None,
connect_args={},
engine_config={},
engine=Engine(oracle+cx_oracle://Fred_the_Farmer:***@fred.farmer.rs:1234/shears)
)
This is useful if you have special needs for creating the engine that cannot be accomplished
with the engine constructor of :class:`OracleDb`. See for instance the `cx_Oracle SessionPool`_
example in the SQLAlchemy docs.
.. _cx_Oracle SessionPool: https://docs.sqlalchemy.org/en/14/dialects/oracle.html#using-cx-oracle-sessionpool
"""
# Class variables
# ---------------
# Template statement to insert new rows, that do not exist already, into a table.
_insert_into_where_not_exists_stmt: str = (
"""INSERT INTO :table (
:insert_cols
)
SELECT
:select_values
FROM DUAL
WHERE
NOT EXISTS (
SELECT
1
FROM :table
WHERE
:where_cols
)"""
)
# MERGE DataFrame statement
_merge_df_stmt: str = (
"""MERGE INTO :table :target
USING (
SELECT
:select_values
FROM DUAL
) :source
ON (
:on
)
WHEN MATCHED THEN
UPDATE
SET
:update_cols
WHERE
:update_where_cols
WHEN NOT MATCHED THEN
INSERT (
:insert_cols
)
VALUES (
:insert_values
)"""
)
__slots__ = ('username', 'password', 'host', 'port', 'service_name', 'sid', 'driver')
def __init__(self,
username: Optional[str] = None,
password: Optional[str] = None,
host: Optional[str] = None,
port: Optional[Union[int, str]] = None,
service_name: Optional[str] = None,
sid: Optional[str] = None,
container: Optional[pandemy.SQLContainer] = None,
driver: str = 'cx_oracle',
url: Optional[Union[str, URL]] = None,
connect_args: Optional[Dict[str, Any]] = None,
engine_config: Optional[Dict[str, Any]] = None,
engine: Optional[Engine] = None,
**kwargs: dict
) -> None:
# Set parameters from provided url or engine
if url is not None or engine is not None:
try:
url = make_url(url) if url is not None else engine.url
except Exception as e:
raise pandemy.CreateConnectionURLError(message=f'{type(e).__name__}: {e.args}', data=e.args) from None
self.username = url.username
self.password = url.password
self.host = url.host
self.port = url.port
self.service_name = url.query.get('service_name')
self.sid = url.database
if len(dvr := url.drivername.split('+')) == 1 or not dvr[1]: # drivername = backend+driver
raise pandemy.CreateConnectionURLError(f'The url does not contain a driver. url={url}', data=url)
else:
self.driver = dvr[1]
# Build the connection URL
else:
required_not_none = {'username': username, 'password': password, 'host': host}
if len(none_result := {key: value for key, value in required_not_none.items() if value is None}) == 3:
raise pandemy.InvalidInputError(
'username, password, host, url and engine cannot be None at the same time.'
)
elif len(none_result) in {1, 2}:
raise pandemy.InvalidInputError(
'username, password and host must all be specified if url and engine are None. '
f'Got: {required_not_none}',
data=required_not_none
)
else:
pass
# Check that service_name and sid are not used together
if service_name is not None and sid is not None:
raise pandemy.InvalidInputError(
'Use either service_name or sid to connect to the database, not both! '
f'service={service_name!r}, sid={sid!r}',
data=(service_name, sid)
)
port = port if port is None else int(port)
self.username = username
self.password = password
self.host = host
self.port = port
self.service_name = service_name
self.sid = sid
self.driver = driver
try:
url = URL.create(
drivername=f'oracle+{driver}',
username=username,
password=password,
host=host,
port=port,
database=sid,
query={'service_name': service_name} if service_name is not None else {}
)
except Exception as e:
raise pandemy.CreateConnectionURLError(message=f'{type(e).__name__}: {e.args}', data=e.args) from None
super().__init__(
url=url if engine is None else None,
container=container,
connect_args=connect_args,
engine_config=engine_config,
engine=engine
)
[docs] @classmethod
def from_url(cls,
url: Union[str, URL],
container: Optional[pandemy.SQLContainer] = None,
engine_config: Optional[Dict[str, Any]] = None) -> OracleDb:
r"""Create an instance of :class:`OracleDb` from a SQLAlchemy :class:`URL <sqlalchemy.engine.URL>`.
.. deprecated:: 1.2.0
Use the `url` parameter of the normal initializer of :class:`OracleDb` instead.
Parameters
----------
url : str or sqlalchemy.engine.URL
A SQLAlchemy URL to use for creating the database engine.
container : SQLContainer or None, default None
A container of database statements that :class:`OracleDb` can use.
engine_config : dict or None, default None
Additional keyword arguments passed to the :func:`sqlalchemy.create_engine` function.
Raises
------
pandemy.CreateConnectionURLError
If `url` is invalid.
Examples
--------
If you are familiar with the connection URL syntax of SQLAlchemy you can create an instance of
:class:`OracleDb` directly from a URL:
>>> url = 'oracle+cx_oracle://Fred_the_Farmer:Penguins-sheep-are-not@my_dsn_name'
>>> db = pandemy.OracleDb.from_url(url)
>>> db
OracleDb(
username='Fred_the_Farmer',
password='***',
host='my_dsn_name',
port=None,
service_name=None,
sid=None,
driver='cx_oracle',
url=oracle+cx_oracle://Fred_the_Farmer:***@my_dsn_name,
container=None,
connect_args={},
engine_config={},
engine=Engine(oracle+cx_oracle://Fred_the_Farmer:***@my_dsn_name)
)
"""
if pandemy.__versiontuple__[:3] >= (1, 2, 0):
warnings.warn(
message=(
'from_url is deprecated in version 1.2.0. '
'Use the url parameter of the normal initializer of OracleDb instead. '
'db = pandemy.OracleDb(url=url)'
),
category=DeprecationWarning,
stacklevel=2
)
return cls(url=url, container=container, engine_config=engine_config)
[docs] @classmethod
def from_engine(cls, engine: Engine, container: Optional[pandemy.SQLContainer] = None) -> OracleDb:
r"""Create an instance of :class:`OracleDb` from a SQLAlchemy :class:`Engine <sqlalchemy.engine.Engine>`.
.. deprecated:: 1.2.0
Use the `engine` parameter of the normal initializer of :class:`OracleDb` instead.
Parameters
----------
engine : sqlalchemy.engine.Engine
A SQLAlchemy Engine to use as the database engine of :class:`OracleDb`.
container : SQLContainer or None, default None
A container of database statements that :class:`OracleDb` can use.
Examples
--------
If you already have a database :class:`engine <sqlalchemy.engine.Engine>` and would like to use it
with :class:`OracleDb` simply create the instance like this:
>>> from sqlalchemy import create_engine
>>> url = 'oracle+cx_oracle://Fred_the_Farmer:Penguins-sheep-are-not@fred.farmer.rs:1234/shears'
>>> engine = create_engine(url, coerce_to_unicode=False)
>>> db = pandemy.OracleDb.from_engine(engine)
>>> db
OracleDb(
username='Fred_the_Farmer',
password='***',
host='fred.farmer.rs',
port=1234,
service_name=None,
sid='shears',
driver='cx_oracle',
url=oracle+cx_oracle://Fred_the_Farmer:***@fred.farmer.rs:1234/shears,
container=None,
connect_args={},
engine_config={},
engine=Engine(oracle+cx_oracle://Fred_the_Farmer:***@fred.farmer.rs:1234/shears)
)
This is useful if you have special needs for creating the engine that cannot be accomplished
with the engine constructor of :class:`OracleDb`. See for instance the `cx_Oracle SessionPool`_
example in the SQLAlchemy docs.
.. _cx_Oracle SessionPool: https://docs.sqlalchemy.org/en/14/dialects/oracle.html#using-cx-oracle-sessionpool
"""
if pandemy.__versiontuple__[:3] >= (1, 2, 0):
warnings.warn(
message=(
'from_engine is deprecated in version 1.2.0. '
'Use the engine parameter of the normal initializer of OracleDb instead. '
'db = pandemy.OracleDb(engine=engine)'
),
category=DeprecationWarning,
stacklevel=2
)
return cls(engine=engine, container=container)