pg_lib api documentation

class pg_lib.pg_encoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: json.encoder.JSONEncoder

default(obj)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
encode(o)

Return a JSON string representation of a Python data structure.

>>> from json.encoder import JSONEncoder
>>> JSONEncoder().encode({"foo": ["bar", "baz"]})
'{"foo": ["bar", "baz"]}'
item_separator = ', '
iterencode(o, _one_shot=False)

Encode the given object and yield each string representation as available.

For example:

for chunk in JSONEncoder().iterencode(bigobject):
    mysocket.write(chunk)
key_separator = ': '
class pg_lib.pg_engine[source]

Bases: object

add_source()[source]

The method adds a new source to the replication catalog. The method calls the function fn_refresh_parts() which generates the log tables used by the replica. If the source is already present a warning is issued and no other action is performed.

build_alter_table(schema, token)[source]

The method builds the alter table statement from the token data. The function currently supports the following statements. DROP TABLE ADD COLUMN CHANGE MODIFY

The change and modify are potential source of breakage for the replica because of the mysql implicit fallback data types. For better understanding please have a look to

http://www.cybertec.at/why-favor-postgresql-over-mariadb-mysql/

Parameters:
  • schema – The schema where the affected table is stored on postgres.
  • token – A dictionary with the tokenised sql statement
Returns:

query the DDL query in the PostgreSQL dialect

Return type:

string

build_create_index(schema, table, index_data)[source]

The method loops over the list index_data and builds a new list with the statements for the indices.

Parameters:
  • destination_schema – the schema where the table belongs
  • table_name – the table name
  • index_data – the index dictionary used to build the create index statements
Returns:

a list with the alter and create index for the given table

Return type:

list

build_enum_ddl(schema, enm_dic)[source]

The method builds the enum DDL using the token data. The postgresql system catalog is queried to determine whether the enum exists and needs to be altered. The alter is not written in the replica log table but executed as single statement as PostgreSQL do not allow the alter being part of a multi command SQL.

Parameters:
  • schema – the schema where the enumeration is present
  • enm_dic – a dictionary with the enumeration details
Returns:

a dictionary with the pre_alter and post_alter statements (e.g. pre alter create type , post alter drop type)

Return type:

dictionary

check_auto_maintenance()[source]

This method checks if the the maintenance for the given source is required. The SQL compares the last maintenance stored in the replica catalogue with the NOW() function. If the value is bigger than the configuration parameter auto_maintenance then it returns true. Otherwise returns false.

Returns:flag which tells if the maintenance should run or not
Return type:boolean
check_postgis()[source]

The method checks whether postgis is present or not on the

check_replica_schema()[source]

The method checks if the sch_chameleon exists

Returns:count from information_schema.schemata
Return type:integer
check_schema_mappings(exclude_current_source=False)[source]

The default is false.

The method checks if there is already a destination schema in the stored schema mappings. As each schema should be managed by one mapping only, if the method returns None then the source can be store safely. Otherwise the action. The method doesn’t take any decision leaving this to the calling methods. The method assumes there is a database connection active. The method returns a list or none. If the list is returned then contains the count and the destination schema name that are already present in the replica catalogue.

Parameters:exclude_current_source – If set to true the check excludes the current source name from the check.
Returns:the schema already mapped in the replica catalogue.
Return type:list
check_source()[source]

The method checks if the source name stored in the class variable self.source is already present. As this method is used in both add and drop source it just retuns the count of the sources. Any decision about the source is left to the calling method. The method assumes there is a database connection active.

check_source_consistent()[source]

This method checks if the database is consistent using the source’s high watermark and the source’s flab b_consistent. If the batch data is larger than the source’s high watermark then the source is marked consistent and all the log data stored witth the source’s tables are set to null in order to ensure all the tables are replicated.

clean_batch_data()[source]

This method removes all the batch data for the source id stored in the class varible self.i_id_source.

The method assumes there is a database connection active.

clean_not_processed_batches()[source]

The method cleans up the not processed batches rows from the table sch_chameleon.t_log_replica. The method should be executed only before starting a replica process. The method assumes there is a database connection active.

cleanup_idx_cons(schema, table)[source]

The method cleansup the constraint and indices for the given table using the statements collected in collect_idx_cons. :param schema: the table’s schema :param table: the table’s name

cleanup_replayed_batches()[source]

The method cleanup the replayed batches for the given source accordingly with the source’s parameter batch_retention

cleanup_source_tables()[source]

The method cleans up the tables for active source in sch_chameleon.t_replica_tables.

cleanup_table_events()[source]

The method cleans up the log events in the source’s log tables for the given tables

collect_idx_cons(schema, table)[source]

The method collects indices and primary keys for the given table from the views v_idx_pkeys,v_fkeys. :param schema: the table’s schema :param table: the table’s name

connect_db()[source]

Connects to PostgreSQL using the parameters stored in self.dest_conn. The dictionary is built using the parameters set via adding the key dbname to the self.pg_conn dictionary. This method’s connection and cursors are widely used in the procedure except for the replay process which uses a dedicated connection and cursor.

copy_data(csv_file, schema, table, column_list)[source]

The method copy the data into postgresql using psycopg2’s copy_expert. The csv_file is a file like object which can be either a csv file or a string io object, accordingly with the configuration parameter copy_mode. The method assumes there is a database connection active.

Parameters:
  • csv_file – file like object with the table’s data stored in CSV format
  • schema – the schema used in the COPY FROM command
  • table – the table name used in the COPY FROM command
  • column_list – A string with the list of columns to use in the COPY FROM command already quoted and comma separated
create_database_schema(schema_name)[source]

The method creates a database schema. The create schema is issued with the clause IF NOT EXISTS. Should the schema be already present the create is skipped.

Parameters:schema_name – The schema name to be created.
create_foreign_keys()[source]

The method creates and validates the foreign keys if we are not keeping the existing schema.

create_idx_cons(schema, table)[source]

The method creates the constraint and indices for the given table using the statements collected in collect_idx_cons. The foreign keys are not created at this stage as they may be left inconsistent during the initial replay phase. The foreign key creation is managed by __create_foreign_keys() which is executed when the replica reaches the consistent status. :param schema: the table’s schema :param table: the table’s name

create_indices(schema, table, index_data)[source]

The method loops over the list index_data and creates the indices on the table specified with schema and table parameters. The method assumes there is a database connection active.

Parameters:
  • schema – the schema name where table belongs
  • table – the table name where the data should be inserted
  • index_data – a list of dictionaries with the index metadata for the given table.
Returns:

a list with the eventual column(s) used as primary key

Return type:

list

create_replica_schema()[source]

The method installs the replica schema sch_chameleon if not already present.

create_table(table_metadata, table_name, schema, metadata_type)[source]

Executes the create table returned by __build_create_table (mysql or pgsql) on the destination_schema.

Parameters:
  • table_metadata – the column dictionary extracted from the source’s information_schema or builty by the sql_parser class
  • table_name – the table name
  • destination_schema – the schema where the table belongs
  • metadata_type – the metadata type, currently supported mysql and pgsql
detach_replica()[source]

The method detach the replica from mysql, resets all the sequences and creates the foreign keys using the dictionary extracted from mysql. The result is a stand alone set of schemas ready to work.

The foreign keys are first created invalid then validated in a second time.

disconnect_db()[source]

The method disconnects the postgres connection if there is any active. Otherwise ignore it.

drop_database_schema(schema_name, cascade)[source]

The method drops a database schema. The drop can be either schema is issued with the clause IF NOT EXISTS. Should the schema be already present the create is skipped.

Parameters:
  • schema_name – The schema name to be created.
  • schema_name – If true the schema is dropped with the clause cascade.
drop_replica_schema()[source]

The method removes the service schema discarding all the replica references. The replicated tables are kept in place though.

drop_source()[source]

The method deletes the source from the replication catalogue. The log tables are dropped as well, discarding any replica reference for the source.

end_maintenance()[source]

The method sets the flag b_maintenance to false for the given source

generate_default_statements(schema, table, column, create_column=None)[source]

The method gets the default value associated with the table and column removing the cast. :param schema: The schema name :param table: The table name :param column: The column name :return: the statements for dropping and creating default value on the affected table :rtype: dictionary

get_active_sources()[source]

The method counts all the sources with state not in ‘ready’ or ‘stopped’. The method assumes there is a database connection active.

get_batch_data()[source]

The method updates the batch status to started for the given source_id and returns the batch informations.

Returns:psycopg2 fetchall results without any manipulation
Return type:psycopg2 tuple
get_catalog_version()[source]

The method returns if the replica schema’s version

Returns:the version string selected from sch_chameleon.v_version
Return type:text
get_data_type(column, schema, table)[source]

The method determines whether the specified type has to be overridden or not.

Parameters:
  • column – the column dictionary extracted from the information_schema or built in the sql_parser class
  • schema – the schema name
  • table – the table name
Returns:

the postgresql converted column type

Return type:

string

get_existing_pkey(schema, table)[source]

The method gets the primary key of an existing table and returns the field(s) composing the PKEY as a list. :param schema: the schema name where table belongs :param table: the table name where the data should be inserted :return: a list with the eventual column(s) used as primary key :rtype: list

get_inconsistent_tables()[source]

The method collects the tables in not consistent state. The informations are stored in a dictionary which key is the table’s name. The dictionary is used in the read replica loop to determine wheter the table’s modifications should be ignored because in not consistent state.

Returns:a dictionary with the tables in inconsistent state and their snapshot coordinates.
Return type:dictionary
get_log_data(log_id)[source]

The method gets the error log entries, if any, from the replica schema. :param log_id: the log id for filtering the row by identifier :return: a dictionary with the errors logged :rtype: dictionary

get_replica_paused()[source]

The method returns the status of the replica. This value is used in both read/replay replica methods for updating the corresponding flags. :return: the b_paused flag for the current source :rtype: boolean

get_replica_status()[source]

The method gets the replica status for the given source. The method assumes there is a database connection active.

get_schema_list()[source]

The method gets the list of source schemas for the given source. The list is generated using the mapping in sch_chameleon.t_sources. Any change in the configuration file is ignored The method assumes there is a database connection active.

get_schema_mappings()[source]

The method gets the schema mappings for the given source. The list is the one stored in the table sch_chameleon.t_sources. Any change in the configuration file is ignored The method assumes there is a database connection active. :return: the schema mappings extracted from the replica catalogue :rtype: dictionary

get_status()[source]

The method gets the status for all sources configured in the target database. :return: a list with the status details :rtype: list

get_table_pkey(schema, table)[source]

The method queries the table sch_chameleon.t_replica_tables and gets the primary key associated with the table, if any. If there is no primary key the method returns None

Parameters:
  • schema – The table schema
  • table – The table name
Returns:

the primary key associated with the table

Return type:

list

get_tables_disabled(format='csv')[source]

The method returns a CSV or a python list of tables excluded from the replica. The origin’s schema is determined from the source’s schema mappings jsonb.

Returns:CSV list of tables excluded from the replica
Return type:text
grant_select()[source]

The method grants the select permissions on all the tables on the replicated schemas to the database roles listed in the source’s variable grant_select_to. In the case a role doesn’t exist the method emits an error message and skips the missing user.

insert_batch(group_insert)[source]

Fallback method for the batch insert. Each row event is processed individually and any problematic row is discarded into the table t_discarded_rows. The row is encoded in base64 in order to prevent any encoding or type issue.

Parameters:group_insert – the event data built in mysql_engine
insert_data(schema, table, insert_data, column_list)[source]

The method is a fallback procedure for when the copy method fails. The procedure performs a row by row insert, very slow but capable to skip the rows with problematic data (e.g. encoding issues).

Parameters:
  • schema – the schema name where table belongs
  • table – the table name where the data should be inserted
  • insert_data – a list of records extracted from the database using the unbuffered cursor
  • column_list – the list of column names quoted for the inserts
insert_source_timings()[source]

The method inserts the source timings in the tables t_last_received and t_last_replayed. On conflict sets the replay/receive timestamps to null. The method assumes there is a database connection active.

reindex_table(schema, table)[source]

The method run a REINDEX TABLE on the table defined by schema and name. :param schema: the table’s schema :param table: the table’s name

replay_replica()[source]

The method replays the row images in the target database using the function fn_replay_mysql. The function returns a composite type. The first element is a boolean flag which is true if the batch still require replay. it’s false if it doesn’t. In that case the while loop ends. The second element is a, optional list of table names. If any table cause error during the replay the problem is captured and the table is removed from the replica. Then the name is returned by the function. As the function can find multiple tables with errors during a single replay run, the table names are stored in a list (Actually is a postgres array, see the create_schema.sql file for more details).

Each batch which is looped trough can also find multiple tables so we return a list of lists to the replica_engine’s calling method.
rollback_upgrade_v1()[source]

The procedure rollsback the upgrade dropping the schema sch_chameleon and renaming the version 1 to the

run_maintenance()[source]

The method runs the maintenance for the given source. After the replica daemons are paused the procedure detach the log tables from the parent log table and performs a VACUUM FULL againts the tables. If any error occurs the tables are attached to the parent table and the replica daemons resumed.

save_discarded_row(row_data)[source]

The method saves the discarded row in the table t_discarded_row along with the id_batch. The row is encoded in base64 as the t_row_data is a text field.

Parameters:row_data – the row data dictionary
save_master_status(master_status)[source]

This method saves the master data determining which log table should be used in the next batch. The method assumes there is a database connection active.

Parameters:master_status – the master data with the binlogfile and the log position
Returns:the batch id or none if no batch has been created
Return type:integer
set_application_name(action='')[source]

The method sets the application name in the replica using the variable self.pg_conn.global_conf.source_name, Making simpler to find the replication processes. If the source name is not set then a generic PGCHAMELEON name is used.

set_autocommit_db(auto_commit)[source]

The method sets the auto_commit flag for the class connection self.pgsql_conn. In general the connection is always autocommit but in some operations (e.g. update_schema_mappings) is better to run the process in a single transaction in order to avoid inconsistencies.

Parameters:autocommit – boolean flag which sets autocommit on or off.
set_batch_processed(id_batch)[source]

The method updates the flag b_processed and sets the processed timestamp for the given batch id. The event ids are aggregated into the table t_batch_events used by the replay function.

Parameters:id_batch – the id batch to set as processed
set_consistent_table(table, schema)[source]

The method set to NULL the binlog name and position for the given table. When the table is marked consistent the read replica loop reads and saves the table’s row images.

Parameters:table – the table name
set_lock_timeout()[source]

The method sets the lock timeout using the value stored in the class attribute lock_timeout.

set_read_paused(read_paused)[source]

The method sets the read proces flag b_paused to true for the given source. The update is performed for the given source and for the negation of b_paused. This approach will prevent unnecessary updates on the table t_last_received.

Parameters:read_paused – the flag to set for the read replica process.
set_replay_paused(read_paused)[source]

The method sets the read proces flag b_paused to true for the given source. The update is performed for the given source and for the negation of b_paused. This approach will prevent unnecessary updates on the table t_last_received.

Parameters:read_paused – the flag to set for the read replica process.
set_source_highwatermark(master_status, consistent)[source]

This method saves the master data within the source. The values are used to determine whether the database has reached the consistent point.

Parameters:master_status – the master data with the binlogfile and the log position
set_source_id()[source]

The method sets the class attribute i_id_source for the self.source. The method assumes there is a database connection active.

set_source_status(source_status)[source]

The method updates the source status for the source_name and sets the class attribute i_id_source. The method assumes there is a database connection active.

Parameters:source_status – The source status to be set.
store_table(schema, table, table_pkey, master_status)[source]

The method saves the table name along with the primary key definition in the table t_replica_tables. This is required in order to let the replay procedure which primary key to use replaying the update and delete. If the table is without primary key is not stored. A table without primary key is copied and the indices are create like any other table. However the replica doesn’t work for the tables without primary key.

If the class variable master status is set then the master’s coordinates are saved along with the table. This happens in general when a table is added to the replica or the data is refreshed with sync_tables.

Parameters:
  • schema – the schema name to store in the table t_replica_tables
  • table – the table name to store in the table t_replica_tables
  • table_pkey – a list with the primary key’s columns. empty if there’s no pkey
  • master_status – the master status data .
swap_schemas()[source]

The method loops over the schema_loading class dictionary and swaps the loading with the destination schemas performing a double rename. The method assumes there is a database connection active.

swap_source_log_table()[source]

The method swaps the sources’s log table and returns the next log table stored in the v_log_table array. The method expects an active database connection.

Returns:The t_log_replica’s active subpartition
Return type:text
swap_tables()[source]

The method loops over the tables stored in the class

truncate_table(schema, table)[source]

The method truncates the table defined by schema and name. :param schema: the table’s schema :param table: the table’s name

unregister_table(schema, table)[source]

This method is used to remove a table from the replica catalogue. The table is just deleted from the table sch_chameleon.t_replica_tables.

Parameters:
  • schema – the schema name where the table is stored
  • table – the table name to remove from t_replica_tables
unset_lock_timeout()[source]

The method sets the lock timeout using the value stored in the class attribute lock_timeout.

update_schema_mappings()[source]

The method updates the schema mappings for the given source. Before executing the updates the method checks for the need to run an update and for any mapping already present in the replica catalogue. If everything is fine the database connection is set autocommit=false. The method updates the schemas in the table t_replica_tables and then updates the mappings in the table t_sources. After the final update the commit is issued to make the updates permanent.

Todo:The method should run only at replica stopped for the given source. The method should also replay all the logged rows for the given source before updating the schema mappings to avoid to get an inconsistent replica.
upgrade_catalogue_v1()[source]

The method upgrade a replica catalogue from version 1 to version 2. The original catalogue is not altered but just renamed. All the existing data are transferred into the new catalogue loaded using the create_schema.sql file.

upgrade_catalogue_v20()[source]

The method applies the migration scripts to the replica catalogue version 2.0. The method checks that all sources are in stopped or ready state.

write_batch(group_insert)[source]

Main method for adding the batch data in the log tables. The row data from group_insert are mogrified in CSV format and stored in the string like object csv_file.

psycopg2’s copy expert is used to store the event data in PostgreSQL.

Should any error occur the procedure fallsback to insert_batch.

Parameters:group_insert – the event data built in mysql_engine
write_ddl(token, query_data, destination_schema)[source]

The method writes the DDL built from the tokenised sql into PostgreSQL.

Parameters:
  • token – the tokenised query
  • query_data – query’s metadata (schema,binlog, etc.)
  • destination_schema – the postgresql destination schema determined using the schema mappings.
class pg_lib.pgsql_source[source]

Bases: object

init_replica()[source]

The method performs a full init replica for the given source