DatabaseManager
DatabaseManager
is the base class providing the methods to interact with databases.
- class pandemy.DatabaseManager(url=None, container=None, connect_args=None, engine_config=None, engine=None, **kwargs)[source]
Bases:
object
Base class with functionality for managing a database.
Each database type will subclass from
DatabaseManager
and implement the initializer which is specific to each database type. Initialization of aDatabaseManager
creates the database engine, which is used to connect to and interact with the database.The
DatabaseManager
can be used on its own but with limited functionality and the initialization requires a SQLAlchemyURL
orEngine
, which require some knowledge about SQLAlchemy.Note
Some methods like
upsert_table()
andmerge_df()
use dialect specific SQL syntax. These methods may not work properly if using theDatabaseManager
directly.DatabaseManager
should only be used if there is no subclass implemented that matches the desired SQL dialect.- Parameters
url (
str
orsqlalchemy.engine.URL
or None, default None) – A SQLAlchemy connection URL to use for creating the database engine. IfNone
anengine
is expected to be supplied to the engine parameter.container (SQLContainer or None, default None) – A container of database statements that
DatabaseManager
can use.connect_args (dict or None, default None) – Additional arguments sent to the driver upon connection that further customizes the connection.
engine_config (dict or None, default None) – Additional keyword arguments passed to the
sqlalchemy.create_engine()
function.engine (
sqlalchemy.engine.Engine
or None, default None) – A SQLAlchemy Engine to use as the database engine ofDatabaseManager
. IfNone
(the default) the engine will be created from url.**kwargs (dict) – Additional keyword arguments that are not used by
DatabaseManager
.
- Raises
pandemy.CreateConnectionURLError – If there are errors with url.
pandemy.CreateEngineError – If the creation of the database engine fails.
pandemy.InvalidInputError – If url and engine are specified or are
None
at the same time.
See also
OracleDb
: An OracleDatabaseManager
.SQLiteDb
: A SQLiteDatabaseManager
.
Examples
Create an instance of a
DatabaseManager
that connects to a SQLite in-memory database.>>> import pandemy >>> db = pandemy.DatabaseManager(url='sqlite://') >>> db DatabaseManager( url=sqlite://, container=None, connect_args={}, engine_config={}, engine=Engine(sqlite://) )
- delete_all_records_from_table(table, conn)[source]
Delete all records from the specified table.
- Parameters
table (str) – The table to delete all records from.
conn (sqlalchemy.engine.base.Connection) – An open connection to the database.
- Raises
pandemy.DeleteFromTableError – If data cannot be deleted from the table.
pandemy.InvalidTableNameError – If the supplied table name is invalid.
See also
load_table()
: Load a SQL table into apandas.DataFrame
.save_df()
: Save apandas.DataFrame
to a table in the database.
Examples
Delete all records from a table in a SQLite in-memory database.
>>> import pandas as pd >>> import pandemy >>> df = pd.DataFrame(data=[ ... [1, 'Lumbridge General Supplies', 'Lumbridge', 1], ... [2, 'Varrock General Store', 'Varrock', 2], ... [3, 'Falador General Store', 'Falador', 3] ... ], ... columns=['StoreId', 'StoreName', 'Location', 'OwnerId'] ... ) >>> df = df.set_index('StoreId') >>> df StoreName Location OwnerId StoreId 1 Lumbridge General Supplies Lumbridge 1 2 Varrock General Store Varrock 2 3 Falador General Store Falador 3 >>> db = pandemy.SQLiteDb() # Create an in-memory database >>> with db.engine.begin() as conn: ... db.save_df(df=df, table='Store', conn=conn) ... df_loaded = db.load_table(sql='Store', conn=conn, index_col='StoreId') >>> df_loaded StoreName Location OwnerId StoreId 1 Lumbridge General Supplies Lumbridge 1 2 Varrock General Store Varrock 2 3 Falador General Store Falador 3 >>> with db.engine.begin() as conn: ... db.delete_all_records_from_table(table='Store', conn=conn) ... df_loaded = db.load_table(sql='Store', conn=conn, index_col='StoreId') >>> assert df_loaded.empty >>> df_loaded Empty DataFrame Columns: [StoreName, Location, OwnerId] Index: []
- execute(sql, conn, params=None)[source]
Execute a SQL statement.
- Parameters
sql (str or sqlalchemy.sql.elements.TextClause) – The SQL statement to execute. A string value is automatically converted to a
sqlalchemy.sql.elements.TextClause
with thesqlalchemy.sql.expression.text()
function.conn (sqlalchemy.engine.base.Connection) – An open connection to the database.
params (dict or list of dict or None, default None) –
Parameters to bind to the SQL statement sql. Parameters in the SQL statement should be prefixed by a colon (:) e.g.
:myparameter
. Parameters in params should not contain the colon ({'myparameter': 'myvalue'}
).Supply a list of parameter dictionaries to execute multiple parametrized statements in the same method call, e.g.
[{'parameter1': 'a string'}, {'parameter2': 100}]
. This is useful for INSERT, UPDATE and DELETE statements.
- Returns
A result object from the executed statement.
- Return type
- Raises
pandemy.ExecuteStatementError – If an error occurs when executing the statement.
pandemy.InvalidInputError – If sql is not of type str or
sqlalchemy.sql.elements.TextClause
.
See also
sqlalchemy.engine.Connection.execute()
: The method used for executing the SQL statement.
Examples
To process the result from the method the database connection must remain open after the method is executed i.e. the context manager cannot be closed before processing the result:
import pandemy db = SQLiteDb(file='Runescape.db') with db.engine.connect() as conn: result = db.execute('SELECT * FROM StoreSupply', conn=conn) for row in result: print(row) # process the result ...
- load_table(sql, conn, params=None, index_col=None, columns=None, parse_dates=None, datetime_cols_dtype=None, datetime_format='%Y-%m-%d %H:%M:%S', localize_tz=None, target_tz=None, dtypes=None, chunksize=None, coerce_float=True)[source]
Load a SQL table into a
pandas.DataFrame
.Specify a table name or a SQL query to load the
pandas.DataFrame
from. Usespandas.read_sql()
function to read from the database.- Parameters
sql (str or sqlalchemy.sql.elements.TextClause) – The table name or SQL query.
conn (sqlalchemy.engine.base.Connection) – An open connection to the database to use for the query.
params (dict of str or None, default None) – Parameters to bind to the SQL query sql. Parameters in the SQL query should be prefixed by a colon (:) e.g.
:myparameter
. Parameters in params should not contain the colon ({'myparameter': 'myvalue'}
).index_col (str or sequence of str or None, default None) – The column(s) to set as the index of the
pandas.DataFrame
.columns (list of str or None, default None) – List of column names to select from the SQL table (only used when sql is a table name).
parse_dates (list or dict or None, default None) –
List of column names to parse into datetime columns.
Dict of {column_name: format string} where format string is strftime compatible in case of parsing string times, or is one of (D, s, ns, ms, us) in case of parsing integer timestamps.
Dict of {column_name: arg dict}, where the arg dict corresponds to the keyword arguments of
pandas.to_datetime()
. Especially useful with databases without native datetime support, such as SQLite.
datetime_cols_dtype ({'str', 'int'} or None, default None) –
If the datetime columns of the loaded DataFrame df should be converted to string or integer data types. If
None
conversion of datetime columns is omitted, which is the default. When using'int'
the datetime columns are converted to the number of seconds since the Unix Epoch of 1970-01-01. The timezone of the datetime columns should be in UTC when using'int'
. You may need to specify parse_dates in order for columns be converted into datetime columns depending on the SQL driver.New in version 1.2.0.
datetime_format (str, default r'%Y-%m-%d %H:%M:%S') –
The datetime (
strftime
) format to use when converting datetime columns to strings.New in version 1.2.0.
localize_tz (str or None, default None) – The name of the timezone which to localize naive datetime columns into. If None (the default) timezone localization is omitted.
target_tz (str or None, default None) – The name of the target timezone to convert the datetime columns into after they have been localized. If None (the default) timezone conversion is omitted.
dtypes (dict or None, default None) – Desired data types for specified columns {‘column_name’: data type}. Use pandas or numpy data types or string names of those. If None no data type conversion is performed.
chunksize (int or None, default None) – If chunksize is specified an iterator of DataFrames will be returned where chunksize is the number of rows in each
pandas.DataFrame
. If chunksize is supplied timezone localization and conversion as well as dtype conversion cannot be performed i.e. localize_tz, target_tz and dtypes have no effect.coerce_float (bool, default True) – Attempts to convert values of non-string, non-numeric objects (like decimal.Decimal) to floating point, useful for SQL result sets.
- Returns
df –
pandas.DataFrame
with the result of the query or an iterator of DataFrames if chunksize is specified.- Return type
pandas.DataFrame or Iterator[pandas.DataFrame]
- Raises
pandemy.DataTypeConversionError – If errors when converting data types using the dtypes parameter.
pandemy.InvalidInputError – Invalid values or types for input parameters or if the timezone localization or conversion fails.
pandemy.LoadTableError – If errors when loading the table using
pandas.read_sql()
.pandemy.SetIndexError – If setting the index of the returned
pandas.DataFrame
fails when index_col is specified and chunksize is None.
See also
save_df()
: Save apandas.DataFrame
to a table in the database.pandas.read_sql()
: Read SQL query or database table into apandas.DataFrame
.pandas.to_datetime()
: The function used for datetime conversion with parse_dates.
Examples
When specifying the chunksize parameter the database connection must remain open to be able to process the DataFrames from the iterator. The processing must occur within the context manager:
import pandemy db = pandemy.SQLiteDb(file='Runescape.db') with db.engine.connect() as conn: df_gen = db.load_table(sql='ItemTradedInStore', conn=conn, chunksize=3) for df in df_gen: print(df) # Process your DataFrames ...
- manage_foreign_keys(conn, action)[source]
Manage how the database handles foreign key constraints.
Should be implemented by DatabaseManagers whose SQL dialect supports enabling/disabling checking foreign key constraints. E.g. SQLite.
- Parameters
conn (sqlalchemy.engine.base.Connection) – An open connection to the database.
action (str) – How to handle foreign key constraints in the database.
- Raises
pandemy.ExecuteStatementError – If changing the handling of foreign key constraint fails.
pandemy.InvalidInputError – If invalid input is supplied to action.
See also
execute()
: Execute a SQL statement.
Examples
Enable and trigger a foreign key constraint using a SQLite in-memory database.
>>> import pandemy >>> db = pandemy.SQLiteDb() # Create an in-memory database >>> with db.engine.begin() as conn: ... db.execute( ... sql="CREATE TABLE Owner(OwnerId INTEGER PRIMARY KEY, OwnerName TEXT)", ... conn=conn ... ) ... db.execute( ... sql=( ... "CREATE TABLE Store(" ... "StoreId INTEGER PRIMARY KEY, " ... "StoreName TEXT, " ... "OwnerId INTEGER REFERENCES Owner(OwnerId)" ... ")" ... ), ... conn=conn ... ) ... db.execute( ... sql="INSERT INTO Owner(OwnerId, OwnerName) VALUES(1, 'Shop keeper')", ... conn=conn ... ) ... db.execute( ... sql=( ... "INSERT INTO Store(StoreId, StoreName, OwnerId) " ... "VALUES(1, 'Lumbridge General Supplies', 2)" ... ), ... conn=conn # OwnerId = 2 is not a valid FOREIGN KEY reference ... ) ... db.manage_foreign_keys(conn=conn, action='ON') ... db.execute( ... sql=( ... "INSERT INTO Store(StoreId, StoreName, OwnerId) " ... "VALUES(1, 'Falador General Store', 3)" ... ), ... conn=conn # OwnerId = 3 is not a valid FOREIGN KEY reference ... ) Traceback (most recent call last): ... pandemy.exceptions.ExecuteStatementError: IntegrityError: ('(sqlite3.IntegrityError) FOREIGN KEY constraint failed',)
- merge_df(df, table, conn, on_cols, merge_cols='all', merge_index_cols=False, omit_update_where_clause=True, chunksize=None, nan_to_none=True, datetime_cols_dtype=None, datetime_format='%Y-%m-%d %H:%M:%S', localize_tz=None, target_tz=None, dry_run=False)[source]
Merge data from a
pandas.DataFrame
into a table.This method executes a combined UPDATE and INSERT statement on a table using the MERGE statement. The method is similar to
upsert_table()
but it only executes one statement instead of two.Databases implemented in Pandemy that support the MERGE statement:
Oracle
The column names of df and table must match.
New in version 1.2.0.
- Parameters
df (pandas.DataFrame) – The DataFrame with data to merge into table.
table (str) – The name of the table to merge data into.
conn (sqlalchemy.engine.base.Connection) – An open connection to the database.
on_cols (Sequence[str] or None) – The columns from df to use in the ON clause to identify how to merge rows into table.
merge_cols (str or Sequence[str] or None, default 'all') – The columns of table to merge (update or insert) with data from df. The default string
'all'
will update all columns. IfNone
no columns will be selected for merge. This is useful if only columns of the index of df should be updated by specifying merge_index_cols.merge_index_cols (bool or Sequence[str], default False) – If the index columns of df should be included in the columns to merge.
True
indicates that the index should be included. If the index is apandas.MultiIndex
a sequence of strings that maps against the levels to include can be used to only include the desired levels.False
excludes the index column(s) from being updated which is the default.omit_update_where_clause (bool, default True) –
If the WHERE clause of the UPDATE clause should be omitted from the MERGE statement. The WHERE clause is implemented as OR conditions where the target and source columns to update are not equal.
Databases in Pandemy that support this option are: Oracle
Example of the SQL generated when
omit_update_where_clause=True
:[...] WHEN MATCHED THEN UPDATE SET t.IsAdventurer = s.IsAdventurer, t.CustomerId = s.CustomerId, t.CustomerName = s.CustomerName WHERE t.IsAdventurer <> s.IsAdventurer OR t.CustomerId <> s.CustomerId OR t.CustomerName <> s.CustomerName [...]
chunksize (int or None, default None) – Divide df into chunks and perform the merge in chunks of chunksize rows. If None all rows of df are processed in one chunk, which is the default. If df has many rows dividing it into chunks may increase performance.
nan_to_none (bool, default True) – If columns with missing values (NaN values) that are of type
pandas.NA
pandas.NaT
ornumpy.nan
should be converted to standard PythonNone
. Some databases do not support these types in parametrized SQL statements.datetime_cols_dtype ({'str', 'int'} or None, default None) – If the datetime columns of df should be converted to string or integer data types before updating the table. If
None
conversion of datetime columns is omitted, which is the default. When using'int'
the datetime columns are converted to the number of seconds since the Unix Epoch of 1970-01-01. The timezone of the datetime columns should be in UTC when using'int'
.datetime_format (str, default r'%Y-%m-%d %H:%M:%S') – The datetime (
strftime
) format to use when converting datetime columns to strings.localize_tz (str or None, default None) – The name of the timezone which to localize naive datetime columns into. If None (the default) timezone localization is omitted.
target_tz (str or None, default None) – The name of the target timezone to convert timezone aware datetime columns, or columns that have been localized by localize_tz, into. If None (the default) timezone conversion is omitted.
dry_run (bool, default False) – Do not execute the merge. Instead return the SQL statement that would have been executed on the database as a string.
- Returns
A result object from the executed statement or the SQL statement that would have been executed if dry_run is
True
. The result object will beNone
if df is empty.- Return type
sqlalchemy.engine.CursorResult or str or None
- Raises
pandemy.ExecuteStatementError – If an error occurs when executing the MERGE statement.
pandemy.InvalidColumnNameError – If a column name of merge_cols, merge_index_cols or on_cols are not among the columns or index of the input DataFrame df.
pandemy.InvalidInputError – Invalid values or types for input parameters or if the timezone localization or conversion fails.
pandemy.InvalidTableNameError – If the supplied table name is invalid.
pandemy.SQLStatementNotSupportedError – If the database dialect does not support the MERGE statement.
See also
save_df()
: Save apandas.DataFrame
to specified table in the database.upsert_table()
: Update a table with apandas.DataFrame
and optionally insert new rows.
Examples
Create a MERGE statement from an empty
pandas.DataFrame
that represents a table in a database by using the parameterdry_run=True
. Thebegin()
method of theDatabaseManager.engine
is mocked to avoid having to connect to a real database.>>> import pandas as pd >>> import pandemy >>> from unittest.mock import MagicMock >>> df = pd.DataFrame(columns=['ItemId', 'ItemName', 'MemberOnly', 'IsAdventurer']) >>> df = df.set_index('ItemId') >>> df Empty DataFrame Columns: [ItemName, MemberOnly, IsAdventurer] Index: [] >>> db = pandemy.OracleDb( ... username='Fred_the_Farmer', ... password='Penguins-sheep-are-not', ... host='fred.farmer.rs', ... port=1234, ... service_name='woollysheep' ... ) >>> db.engine.begin = MagicMock() # Mock the begin method >>> with db.engine.begin() as conn: ... merge_stmt = db.merge_df( ... df=df, ... table='Item', ... conn=conn, ... on_cols=['ItemName'], ... merge_cols='all', ... merge_index_cols=False, ... dry_run=True ... ) >>> print(merge_stmt) MERGE INTO Item t USING ( SELECT :ItemName AS ItemName, :MemberOnly AS MemberOnly, :IsAdventurer AS IsAdventurer FROM DUAL ) s ON ( t.ItemName = s.ItemName ) WHEN MATCHED THEN UPDATE SET t.MemberOnly = s.MemberOnly, t.IsAdventurer = s.IsAdventurer WHEN NOT MATCHED THEN INSERT ( t.ItemName, t.MemberOnly, t.IsAdventurer ) VALUES ( s.ItemName, s.MemberOnly, s.IsAdventurer )
- save_df(df, table, conn, if_exists='append', index=True, index_label=None, chunksize=None, schema=None, dtype=None, datetime_cols_dtype=None, datetime_format='%Y-%m-%d %H:%M:%S', localize_tz=None, target_tz=None, method=None)[source]
Save the
pandas.DataFrame
df to specified table in the database.If the table does not exist it will be created. If the table already exists the column names of the
pandas.DataFrame
df must match the table column definitions. Usespandas.DataFrame.to_sql()
method to write thepandas.DataFrame
to the database.- Parameters
df (pandas.DataFrame) – The DataFrame to save to the database.
table (str) – The name of the table where to save the
pandas.DataFrame
.conn (sqlalchemy.engine.base.Connection) – An open connection to the database.
if_exists (str, {'append', 'replace', 'drop-replace', 'fail'}) –
How to update an existing table in the database:
’append’: Append df to the existing table.
’replace’: Delete all records from the table and then write df to the table.
’drop-replace’: Drop the table, recreate it, and then write df to the table.
’fail’: Raise
pandemy.TableExistsError
if the table exists.
New in version 1.2.0: ‘drop-replace’
index (bool, default True) – Write
pandas.DataFrame
index as a column. Uses the name of the index as the column name for the table.index_label (str or sequence of str or None, default None) – Column label for index column(s). If None is given (default) and index is True, then the index names are used. A sequence should be given if the
pandas.DataFrame
uses apandas.MultiIndex
.chunksize (int or None, default None) – The number of rows in each batch to be written at a time. If None, all rows will be written at once.
schema (str, None, default None) – Specify the schema (if database flavor supports this). If None, use default schema.
dtype (dict or scalar, default None) – Specifying the data type for columns. If a dictionary is used, the keys should be the column names and the values should be the SQLAlchemy types or strings for the sqlite3 legacy mode. If a scalar is provided, it will be applied to all columns.
datetime_cols_dtype ({'str', 'int'} or None, default None) –
If the datetime columns of df should be converted to string or integer data types before saving the table. If
None
no conversion of datetime columns is performed, which is the default. When using'int'
the datetime columns are converted to the number of seconds since the Unix Epoch of 1970-01-01. The timezone of the datetime columns should be in UTC when using'int'
.New in version 1.2.0.
datetime_format (str, default r'%Y-%m-%d %H:%M:%S') –
The datetime (
strftime
) format to use when converting datetime columns to strings.New in version 1.2.0.
localize_tz (str or None, default None) –
The name of the timezone which to localize naive datetime columns into. If None (the default) timezone localization is omitted.
New in version 1.2.0.
target_tz (str or None, default None) –
The name of the target timezone to convert timezone aware datetime columns, or columns that have been localized by localize_tz, into. If None (the default) timezone conversion is omitted.
New in version 1.2.0.
method (None, 'multi', callable, default None) –
Controls the SQL insertion clause used:
- None:
Uses standard SQL INSERT clause (one per row).
- ’multi’:
Pass multiple values in a single INSERT clause. It uses a special SQL syntax not supported by all backends. This usually provides better performance for analytic databases like Presto and Redshift, but has worse performance for traditional SQL backend if the table contains many columns. For more information check the SQLAlchemy documentation.
- callable with signature (pd_table, conn, keys, data_iter):
This can be used to implement a more performant insertion method based on specific backend dialect features. See: pandas SQL insertion method.
- Raises
pandemy.DeleteFromTableError – If data in the table cannot be deleted when
if_exists='replace'
.pandemy.InvalidInputError – Invalid values or types for input parameters or if the timezone localization or conversion fails.
pandemy.InvalidTableNameError – If the supplied table name is invalid.
pandemy.SaveDataFrameError – If the
pandas.DataFrame
cannot be saved to the table.pandemy.TableExistsError – If the table exists and
if_exists='fail'
.
See also
load_table()
: Load a SQL table into apandas.DataFrame
.merge_df()
: Merge data from apandas.DataFrame
into a table.upsert_table()
: Update a table with apandas.DataFrame
and insert new rows if any.pandas.DataFrame.to_sql()
: Write records stored in a DataFrame to a SQL database.pandas SQL insertion method : Details about using the method parameter.
Examples
Save a
pandas.DataFrame
to a SQLite in-memory database.>>> import pandas as pd >>> import pandemy >>> df = pd.DataFrame(data=[ ... [1, 'Lumbridge General Supplies', 'Lumbridge', 1], ... [2, 'Varrock General Store', 'Varrock', 2], ... [3, 'Falador General Store', 'Falador', 3] ... ], ... columns=['StoreId', 'StoreName', 'Location', 'OwnerId'] ... ) >>> df = df.set_index('StoreId') >>> df StoreName Location OwnerId StoreId 1 Lumbridge General Supplies Lumbridge 1 2 Varrock General Store Varrock 2 3 Falador General Store Falador 3 >>> db = pandemy.SQLiteDb() # Create an in-memory database >>> with db.engine.begin() as conn: ... db.save_df(df=df, table='Store', conn=conn)
- upsert_table(df, table, conn, where_cols, upsert_cols='all', upsert_index_cols=False, update_only=False, chunksize=None, nan_to_none=True, datetime_cols_dtype=None, datetime_format='%Y-%m-%d %H:%M:%S', localize_tz=None, target_tz=None, dry_run=False)[source]
Update a table with data from a
pandas.DataFrame
and insert new rows if any.This method executes an UPDATE statement followed by an INSERT statement (UPSERT) to update the rows of a table with a
pandas.DataFrame
and insert new rows. The INSERT statement can be omitted with the update_only parameter.The column names of df and table must match.
New in version 1.2.0.
- Parameters
df (pandas.DataFrame) – The DataFrame with data to upsert.
table (str) – The name of the table to upsert.
conn (sqlalchemy.engine.base.Connection) – An open connection to the database.
where_cols (Sequence[str]) – The columns from df to use in the WHERE clause to identify the rows to upsert.
upsert_cols (str or Sequence[str] or None, default 'all') – The columns from table to upsert with data from df. The default string
'all'
will upsert all columns. IfNone
no columns will be selected for upsert. This is useful if only columns of the index of df should be upserted by specifying upsert_index_cols.upsert_index_cols (bool or Sequence[str], default False) – If the index columns of df should be included in the columns to upsert.
True
indicates that the index should be included. If the index is apandas.MultiIndex
a sequence of strings that maps against the levels to include can be used to only include the desired levels.False
excludes the index column(s) from being upserted which is the default.update_only (bool, default False) – If
True
the table should only be updated and new rows not inserted. IfFalse
(the default) perform an update and insert new rows.chunksize (int or None, default None) – Divide df into chunks and perform the upsert in chunks of chunksize rows. If None all rows of df are processed in one chunk, which is the default. If df has many rows dividing it into chunks may increase performance.
nan_to_none (bool, default True) – If columns with missing values (NaN values) that are of type
pandas.NA
pandas.NaT
ornumpy.nan
should be converted to standard PythonNone
. Some databases do not support these types in parametrized SQL statements.datetime_cols_dtype ({'str', 'int'} or None, default None) – If the datetime columns of df should be converted to string or integer data types before upserting the table. SQLite cannot handle datetime objects as parameters and should use this option. If
None
conversion of datetime columns is omitted, which is the default. When using'int'
the datetime columns are converted to the number of seconds since the Unix Epoch of 1970-01-01. The timezone of the datetime columns should be in UTC when using'int'
.datetime_format (str, default r'%Y-%m-%d %H:%M:%S') – The datetime (
strftime
) format to use when converting datetime columns to strings.localize_tz (str or None, default None) – The name of the timezone which to localize naive datetime columns into. If None (the default) timezone localization is omitted.
target_tz (str or None, default None) – The name of the target timezone to convert timezone aware datetime columns, or columns that have been localized by localize_tz, into. If None (the default) timezone conversion is omitted.
dry_run (bool, default False) – Do not execute the upsert. Instead return the SQL statements that would have been executed on the database. The return value is a tuple (‘UPDATE statement’, ‘INSERT statement’). If update_only is
True
the INSERT statement will beNone
.
- Returns
Result objects from the executed statements or the SQL statements that would have been executed if dry_run is
True
. The result objects will beNone
if df is empty.- Return type
Tuple[sqlalchemy.engine.CursorResult, Optional[sqlalchemy.engine.CursorResult]] or Tuple[str, Optional[str]] or Tuple[None, None]
- Raises
pandemy.ExecuteStatementError – If an error occurs when executing the UPDATE and or INSERT statement.
pandemy.InvalidColumnNameError – If a column name of upsert_cols or upsert_index_cols are not among the columns or index of df.
pandemy.InvalidInputError – Invalid values or types for input parameters or if the timezone localization or conversion fails.
pandemy.InvalidTableNameError – If the supplied table name is invalid.
See also
execute()
: Execute a SQL statement.load_table()
: Load a table into apandas.DataFrame
.merge_df()
: Merge data from apandas.DataFrame
into a table.save_df()
: Save apandas.DataFrame
to a table in the database.Examples
Create a simple table called Customer and insert some data from a
pandas.DataFrame
(df
). Change the first row and add a new row todf
. Finally upsert the table withdf
.>>> import pandas as pd >>> import pandemy >>> df = pd.DataFrame(data={ ... 'CustomerId': [1, 2], ... 'CustomerName': ['Zezima', 'Dr Harlow'] ... } ... ) >>> df = df.set_index('CustomerId') >>> df CustomerName CustomerId 1 Zezima 2 Dr Harlow >>> db = pandemy.SQLiteDb() # Create an in-memory database >>> with db.engine.begin() as conn: ... _ = db.execute( ... sql=( ... 'CREATE TABLE Customer(' ... 'CustomerId INTEGER PRIMARY KEY, ' ... 'CustomerName TEXT NOT NULL)' ... ), ... conn=conn ... ) ... db.save_df(df=df, table='Customer', conn=conn) >>> df.loc[1, 'CustomerName'] = 'Baraek' # Change data >>> df.loc[3, 'CustomerName'] = 'Mosol Rei' # Add new data >>> df CustomerName CustomerId 1 Baraek 2 Dr Harlow 3 Mosol Rei >>> with db.engine.begin() as conn: ... _, _ = db.upsert_table( ... df=df, ... table='Customer', ... conn=conn, ... where_cols=['CustomerId'], ... upsert_index_cols=True ... ) ... df_upserted = db.load_table( ... sql='SELECT * FROM Customer ORDER BY CustomerId ASC', ... conn=conn, ... index_col='CustomerId' ... ) >>> df_upserted CustomerName CustomerId 1 Baraek 2 Dr Harlow 3 Mosol Rei
- class pandemy.SQLiteDb(file=':memory:', must_exist=False, container=None, driver='sqlite3', url=None, connect_args=None, engine_config=None, engine=None, **kwargs)[source]
Bases:
DatabaseManager
A SQLite
DatabaseManager
.- Parameters
file (str or pathlib.Path, default ':memory:') – The path (absolute or relative) to the SQLite database file. The default creates an in-memory database.
must_exist (bool, default False) – If
True
validate that file exists unlessfile=':memory:'
. If it does not existpandemy.DatabaseFileNotFoundError
is raised. IfFalse
the validation is omitted.container (SQLContainer or None, default None) – A container of database statements that the SQLite
DatabaseManager
can use.driver (str, default 'sqlite3') –
The database driver to use. The default is the Python built-in module
sqlite3
, which is also the default driver of SQLAlchemy. When the default is used no driver name is displayed in the connection URL.New in version 1.2.0.
url (
str
orsqlalchemy.engine.URL
or None, default None) –A SQLAlchemy connection URL to use for creating the database engine. It overrides the value of file and must_exist.
New in version 1.2.0.
connect_args (dict or None, default None) –
Additional arguments sent to the driver upon connection that further customizes the connection.
New in version 1.2.0.
engine_config (dict or None, default None) – Additional keyword arguments passed to the
sqlalchemy.create_engine()
function.engine (
sqlalchemy.engine.Engine
or None, default None) –A SQLAlchemy Engine to use as the database engine of
SQLiteDb
. It overrides the value of file and must_exist. If specified the value of url should beNone
. IfNone
(the default) the engine will be created from file or url.New in version 1.2.0.
**kwargs (dict) – Additional keyword arguments that are not used by
SQLiteDb
.
- Raises
pandemy.CreateConnectionURLError – If there are errors with url.
pandemy.CreateEngineError – If the creation of the database engine fails.
pandemy.DatabaseFileNotFoundError – If the database file does not exist when
must_exist=True
.pandemy.InvalidInputError – If the parameters are specified with invalid input.
See also
pandemy.DatabaseManager
: The parent class.sqlalchemy.create_engine()
: The function used to create the database engine.SQLite dialect and drivers : The SQLite dialect and drivers in SQLAlchemy.
SQLite : The SQLite homepage.
- property conn_str
Backwards compatibility for the conn_str attribute.
The conn_str attribute is deprecated in version 1.2.0 and replaced by the url attribute.
- manage_foreign_keys(conn, action='ON')[source]
Manage how the database handles foreign key constraints.
In SQLite the check of foreign key constraints is not enabled by default.
- Parameters
conn (sqlalchemy.engine.base.Connection) – An open connection to the database.
action ({'ON', 'OFF'}) – Enable (‘ON’) or disable (‘OFF’) the check of foreign key constraints.
- Raises
pandemy.ExecuteStatementError – If the enabling/disabling of the foreign key constraints fails.
pandemy.InvalidInputError – If invalid input is supplied to action.
See also
execute()
: Execute a SQL statement.
Examples
Enable and trigger a foreign key constraint using an in-memory database.
>>> import pandemy >>> db = pandemy.SQLiteDb() # Create an in-memory database >>> with db.engine.begin() as conn: ... db.execute( ... sql="CREATE TABLE Owner(OwnerId INTEGER PRIMARY KEY, OwnerName TEXT)", ... conn=conn ... ) ... db.execute( ... sql=( ... "CREATE TABLE Store(" ... "StoreId INTEGER PRIMARY KEY, " ... "StoreName TEXT, " ... "OwnerId INTEGER REFERENCES Owner(OwnerId)" ... ")" ... ), ... conn=conn ... ) ... db.execute( ... sql="INSERT INTO Owner(OwnerId, OwnerName) VALUES(1, 'Shop keeper')", ... conn=conn ... ) ... db.execute( ... sql=( ... "INSERT INTO Store(StoreId, StoreName, OwnerId) " ... "VALUES(1, 'Lumbridge General Supplies', 2)" ... ), ... conn=conn # OwnerId = 2 is not a valid FOREIGN KEY reference ... ) ... db.manage_foreign_keys(conn=conn, action='ON') ... db.execute( ... sql=( ... "INSERT INTO Store(StoreId, StoreName, OwnerId) " ... "VALUES(1, 'Falador General Store', 3)" ... ), ... conn=conn # OwnerId = 3 is not a valid FOREIGN KEY reference ... ) Traceback (most recent call last): ... pandemy.exceptions.ExecuteStatementError: IntegrityError: ('(sqlite3.IntegrityError) FOREIGN KEY constraint failed',)
- class pandemy.OracleDb(username=None, password=None, host=None, port=None, service_name=None, sid=None, container=None, driver='cx_oracle', url=None, connect_args=None, engine_config=None, engine=None, **kwargs)[source]
Bases:
DatabaseManager
An Oracle
DatabaseManager
.Requires the cx_Oracle package to be able to create a connection to the database.
To use a DSN connection string specified in the Oracle connection config file, tnsnames.ora, set host to the desired network service name in tnsnames.ora and leave port, service_name and sid as
None
. Using a tnsnames.ora file is needed to connect to Oracle Cloud Autononmous Databases.New in version 1.1.0.
- Parameters
username (str or None, default None) – The username of the database account. Must be specified if url or engine are
None
.password (str or None, default None) – The password of the database account. Must be specified if url or engine are
None
.host (str or None, default None) – The host name or server IP-address where the database is located. Must be specified if url or engine are
None
.port (int or str or None, default None) – The port the host is listening on. The default port of Oracle databases is 1521.
service_name (str or None, default None) – The name of the service used for the database connection.
sid (str or None, default None) – The SID used for the connection. SID is the name of the database instance on the host. Note that sid and service_name should not be specified at the same time.
container (SQLContainer or None, default None) – A container of database statements that
OracleDb
can use.driver (str, default 'cx_oracle') –
The database driver to use.
New in version 1.2.0.
url (
str
orsqlalchemy.engine.URL
or None, default None) – A SQLAlchemy connection URL to use for creating the database engine. Specifying url overrides the parameters: username, password, host port, service_name, sid and driver. If url is specified engine should beNone
.connect_args (dict or None, default None) – Additional arguments sent to the driver upon connection that further customizes the connection.
engine_config (dict or None, default None) – Additional keyword arguments passed to the
sqlalchemy.create_engine()
function.engine (
sqlalchemy.engine.Engine
or None, default None) – A SQLAlchemy Engine to use as the database engine ofOracleDb
. IfNone
(the default) the engine will be created from the other parameters. When specified it overrides the parameters: username, password, host, port, service_name, sid and driver. If specified url should beNone
.**kwargs (dict) – Additional keyword arguments that are not used by
OracleDb
.
- Raises
pandemy.CreateConnectionURLError – If the creation of the connection URL fails.
pandemy.CreateEngineError – If the creation of the database engine fails.
pandemy.InvalidInputError – If invalid combinations of the parameters are used.
See also
pandemy.DatabaseManager
: The parent class.sqlalchemy.create_engine()
: The function used to create the database engine.The cx_Oracle database driver : Details of the cx_Oracle driver and its usage in SQLAlchemy.
Specifying connect_args : Details about the connect_args parameter.
tnsnames.ora : Oracle connection config file.
Examples
Create an instance of
OracleDb
and connect using a service:>>> db = pandemy.OracleDb( ... username='Fred_the_Farmer', ... password='Penguins-sheep-are-not', ... host='fred.farmer.rs', ... port=1234, ... service_name='woollysheep' ... ) >>> str(db) 'OracleDb(oracle+cx_oracle://Fred_the_Farmer:***@fred.farmer.rs:1234?service_name=woollysheep)' >>> db.username 'Fred_the_Farmer' >>> db.password 'Penguins-sheep-are-not' >>> db.host 'fred.farmer.rs' >>> db.port 1234 >>> db.service_name 'woollysheep' >>> db.url oracle+cx_oracle://Fred_the_Farmer:***@fred.farmer.rs:1234?service_name=woollysheep >>> db.engine Engine(oracle+cx_oracle://Fred_the_Farmer:***@fred.farmer.rs:1234?service_name=woollysheep)
Connect with a DSN connection string using a net service name from a tnsnames.ora config file and supply additional connection arguments and engine configuration:
>>> import cx_Oracle >>> connect_args = { ... 'encoding': 'UTF-8', ... 'nencoding': 'UTF-8', ... 'mode': cx_Oracle.SYSDBA, ... 'events': True ... } >>> engine_config = { ... 'coerce_to_unicode': False, ... 'arraysize': 40, ... 'auto_convert_lobs': False ... } >>> db = pandemy.OracleDb( ... username='Fred_the_Farmer', ... password='Penguins-sheep-are-not', ... host='my_dsn_name', ... connect_args=connect_args, ... engine_config=engine_config ... ) >>> db OracleDb( username='Fred_the_Farmer', password='***', host='my_dsn_name', port=None, service_name=None, sid=None, driver='cx_oracle', url=oracle+cx_oracle://Fred_the_Farmer:***@my_dsn_name, container=None, connect_args={'encoding': 'UTF-8', 'nencoding': 'UTF-8', 'mode': 2, 'events': True}, engine_config={'coerce_to_unicode': False, 'arraysize': 40, 'auto_convert_lobs': False}, engine=Engine(oracle+cx_oracle://Fred_the_Farmer:***@my_dsn_name) )
If you are familiar with the connection URL syntax of SQLAlchemy you can create an instance of
OracleDb
directly from a URL:>>> url = 'oracle+cx_oracle://Fred_the_Farmer:Penguins-sheep-are-not@my_dsn_name' >>> db = pandemy.OracleDb(url=url) >>> db OracleDb( username='Fred_the_Farmer', password='***', host='my_dsn_name', port=None, service_name=None, sid=None, driver='cx_oracle', url=oracle+cx_oracle://Fred_the_Farmer:***@my_dsn_name, container=None, connect_args={}, engine_config={}, engine=Engine(oracle+cx_oracle://Fred_the_Farmer:***@my_dsn_name) )
If you already have a database
engine
and would like to use it withOracleDb
simply create the instance like this:>>> from sqlalchemy import create_engine >>> url = 'oracle+cx_oracle://Fred_the_Farmer:Penguins-sheep-are-not@fred.farmer.rs:1234/shears' >>> engine = create_engine(url, coerce_to_unicode=False) >>> db = pandemy.OracleDb(engine=engine) >>> db OracleDb( username='Fred_the_Farmer', password='***', host='fred.farmer.rs', port=1234, service_name=None, sid='shears', driver='cx_oracle', url=oracle+cx_oracle://Fred_the_Farmer:***@fred.farmer.rs:1234/shears, container=None, connect_args={}, engine_config={}, engine=Engine(oracle+cx_oracle://Fred_the_Farmer:***@fred.farmer.rs:1234/shears) )
This is useful if you have special needs for creating the engine that cannot be accomplished with the engine constructor of
OracleDb
. See for instance the cx_Oracle SessionPool example in the SQLAlchemy docs.- classmethod from_url(url, container=None, engine_config=None)[source]
Create an instance of
OracleDb
from a SQLAlchemyURL
.Deprecated since version 1.2.0: Use the url parameter of the normal initializer of
OracleDb
instead.- Parameters
url (str or sqlalchemy.engine.URL) – A SQLAlchemy URL to use for creating the database engine.
container (SQLContainer or None, default None) – A container of database statements that
OracleDb
can use.engine_config (dict or None, default None) – Additional keyword arguments passed to the
sqlalchemy.create_engine()
function.
- Raises
pandemy.CreateConnectionURLError – If url is invalid.
Examples
If you are familiar with the connection URL syntax of SQLAlchemy you can create an instance of
OracleDb
directly from a URL:>>> url = 'oracle+cx_oracle://Fred_the_Farmer:Penguins-sheep-are-not@my_dsn_name' >>> db = pandemy.OracleDb.from_url(url) >>> db OracleDb( username='Fred_the_Farmer', password='***', host='my_dsn_name', port=None, service_name=None, sid=None, driver='cx_oracle', url=oracle+cx_oracle://Fred_the_Farmer:***@my_dsn_name, container=None, connect_args={}, engine_config={}, engine=Engine(oracle+cx_oracle://Fred_the_Farmer:***@my_dsn_name) )
- classmethod from_engine(engine, container=None)[source]
Create an instance of
OracleDb
from a SQLAlchemyEngine
.Deprecated since version 1.2.0: Use the engine parameter of the normal initializer of
OracleDb
instead.- Parameters
engine (sqlalchemy.engine.Engine) – A SQLAlchemy Engine to use as the database engine of
OracleDb
.container (SQLContainer or None, default None) – A container of database statements that
OracleDb
can use.
Examples
If you already have a database
engine
and would like to use it withOracleDb
simply create the instance like this:>>> from sqlalchemy import create_engine >>> url = 'oracle+cx_oracle://Fred_the_Farmer:Penguins-sheep-are-not@fred.farmer.rs:1234/shears' >>> engine = create_engine(url, coerce_to_unicode=False) >>> db = pandemy.OracleDb.from_engine(engine) >>> db OracleDb( username='Fred_the_Farmer', password='***', host='fred.farmer.rs', port=1234, service_name=None, sid='shears', driver='cx_oracle', url=oracle+cx_oracle://Fred_the_Farmer:***@fred.farmer.rs:1234/shears, container=None, connect_args={}, engine_config={}, engine=Engine(oracle+cx_oracle://Fred_the_Farmer:***@fred.farmer.rs:1234/shears) )
This is useful if you have special needs for creating the engine that cannot be accomplished with the engine constructor of
OracleDb
. See for instance the cx_Oracle SessionPool example in the SQLAlchemy docs.