QM Data Pipeline
The QM (Quality Management) Data Pipeline is responsible for managing and processing QM-related data. It extracts data from the source database (PostgreSQL), applies necessary transformations to align with schema requirements, and loads the data into the target database (MySQL). The pipeline ensures data consistency and completeness through validation and transformation processes. Key tasks include:
Validating data fields for accuracy and ensuring required fields are present.
Mapping reviews and schedule fields to the target schema.
Handling data updates and upserts to maintain data integrity and avoid duplication.
The QM Data Pipeline consists of three parallel pipelines, each responsible for processing a specific subset of the data:
Reviews Pipeline:
Handles the transformation and processing of reviews-related data.
Ensures that reviews details, and other relevant fields, are aligned with the target schema.
Responsible for maintaining the integrity of review-level data.
Schedule Pipeline:
Processes data related to schedules set for reviews.
Transforms fields like
id
,questionaire_id
andreviewers
to match the target schema.Ensures accurate mapping between reviewers and their respective questionnaire.
Schedule_filter Pipeline:
Processes data related to schedule_filter set for schedules.
Transforms fields like
id
,wrapups
andagents
match the target schema.Ensures accurate mapping between different fields
These pipelines operate independently but are orchestrated together for cohesive data handling.
Configurations
Configurations for the QM Data Pipeline are provided in a qm_data_pipeline_config.yaml
format to ensure flexibility and adaptability. These configurations are designed for normal and ideal use cases and are advised to be used as-is to achieve optimal results.
source:
type: "postgre"
connection_string: "postgresql+psycopg2://sa:Expertflow123@ef-postgresql.ef-external.svc:5432/qm_db"
queries:
review:
database: "qm_db"
collection_name: "public.review"
filter: {}
replication_key: "updated_at"
transformation: "transform_review_data"
num_batches: 50
query_keys:
- "compare_reviewer_gold"
- "compare_teams_gold"
- "review_volume_gold"
- "team_agent_skill_gold"
schedule:
database: "qm_db"
collection_name: "public.schedule"
filter: {}
replication_key: "updated_at"
transformation: "transform_schedule_data"
num_batches: 50
query_keys:
schedule_filter:
database: "qm_db"
collection_name: "public.schedule_filter"
filter: {}
replication_key: "updated_at"
transformation: "transform_schedule_filter_data"
num_batches: 50
query_keys:
batch_size: 30000 # Adjust as needed
target:
type: "mysql"
db_url: "mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db"
enable_ssl: true # Enable or disable SSL connections
ssl_ca: "/transflux/certificates/mysql_certs/ca.pem"
ssl_cert: "/transflux/certificates/mysql_certs/client-cert.pem"
ssl_key: "/transflux/certificates/mysql_certs/client-key.pem"
configdb:
type: "mysql"
db_url: "mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db"
enable_ssl: true # Enable or disable SSL connections
ssl_ca: "/transflux/certificates/mysql_certs/ca.pem"
ssl_cert: "/transflux/certificates/mysql_certs/client-cert.pem"
ssl_key: "/transflux/certificates/mysql_certs/client-key.pem"
transformation:
schedule_interval: "0 */5 * * *"
Source Configuration
This section defines the PostgreSQL source settings for data extraction.
type
: Specifies the data source type.
Example:"postgre"
indicates that PostgreSQL is the source.connection_string
:
The connection string for PostgreSQL. It includes the following components:username
andpassword
for authentication.PostgreSQL host and port.
Example (non-replica):
CODEconnection_string: "postgresql+psycopg2://sa:Expertflow123@ef-postgresql.ef-external.svc:5432/qm_db"
queries
:
A dictionary containing query configurations for different pipelines.review
: Configurations for the review pipeline.database
: Name of the PostgreSQL database from where review data is being extracted. Example:"qm_db"
.collection_name
: Name of the table within dedicated database. Example:"public.review"
.filter
: Query filter applied to fetch data. Example:{}
(fetch all records).replication_key
: Field used to track updates. Example:"updated_at"
.transformation
: Transformation function name. Example:"transform_review_data"
.num_batches
: Number of data batches. Example:50
.query_keys
: Reserved for gold queries ( for loading data in gold table if needed ).Mapping query with keys
compare_reviewer_gold
compare_teams_gold
review_volume_gold
team_agent_skill_gold
would be fetched from query registry
schedule
: Configurations for the schedule pipeline.Similar fields as the
review
pipeline, but for thepublic.schedule
collection withoutquery_keys
schedule_filter
: Configurations for the schedule pipeline.Similar fields as the
review
pipeline, but for thepublic.schedule_filer
collection withoutquery_keys
Batch Size
batch_size
:
Number of records processed per batch.
Example:30000
.
Target Configuration
This section defines the target MySQL database settings for data loading.
type
: Specifies the target database type.
Example:"mysql"
.db_url
: Connection string for the target MySQL database.
Format:"mysql+pymysql://<username>:<password>@<host>:<port>/<database>"
.
Example:"mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db"
.enable_ssl
: Enables SSL communication with the MySQL database.
Example:true
.SSL Configuration:
ssl_ca
: Path to the CA certificate. Example:"/transflux/certificates/mysql_certs/ca.pem"
.ssl_cert
: Path to the client certificate. Example:"/transflux/certificates/mysql_certs/client-cert.pem"
.ssl_key
: Path to the client private key. Example:"/transflux/certificates/mysql_certs/client-key.pem"
.
Config Database
The configuration database (configdb
) stores metadata and operational settings for Data Platform.
Fields are identical to the target configuration.
Schedule Interval
schedule_interval
:
Cron expression defining the pipeline's schedule in Data Platform.
Example:"0 */5 * * *"
(runs every 5 hours).