Forms Data Pipeline
The Forms Data Pipeline is responsible for managing and processing forms-related data. It extracts data from sources (MongoDB) applies necessary transformations to align with schema requirements, and loads the data into target database (MySQL). Key tasks include:
Validating data fields for completeness and accuracy.
Mapping form structure fields to the target schema.
Handling data updates and upserts to maintain data integrity.
The Forms Data Pipeline consists of two parallel pipelines:
Survey distributions data Pipeline: Handles transformations specific to the Survey distributions, ensuring alignment with the schema definitions in the target. This pipeline processes data related to surveys offered to customers.
Forms Data Pipeline: Processes and transforms form data, this is usually the submitted data of a form.
This parallel processing is orchestrated in Airflow, allowing for efficient batch processing and separate transformation logic for schema and data.
Configurations
Configurations for the Teams Data Pipeline are provided in a forms_data_pipeline_config.yaml
format to ensure flexibility and adaptability. These configurations are designed for normal and ideal use cases and are advised to be used as-is to achieve optimal results.
source:
type: "mongodb"
connection_string: "mongodb://root:Expertflow123@mongo-mongodb.ef-external.svc:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
# connection string for replica support
# connection_string: "mongodb://root:Expertflow123@mongo-mongodb-0.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-1.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-2.mongo-mongodb-headless.ef-external.svc.cluster.local:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
replica_set_enabled: false
replica_set: "expertflow"
read_preference : "secondaryPreferred"
queries:
forms:
database: "conversation-manager_db"
collection_name: "CustomerTopicEvents"
filter: {"cimEvent.data.body.type": "FORM_DATA"}
replication_key: "recordCreationTime"
transformation: "transform_forms_data"
num_batches: 50
query_keys:
- "forms_gold"
survey_distributions:
database: "surveydb"
collection_name: "surveydistributions"
filter: {}
replication_key: "updatedAt"
transformation: "transform_survey_distributions"
num_batches: 50
query_keys:
# TLS/SSL Configuration
tls: true # Set to false if you don't want to use TLS
tls_ca_file: "/transflux/certificates/mongo_certs/mongodb-ca-cert"
tls_cert_key_file: "/transflux/certificates/mongo_certs/client-pem" # Includes both client certificate and private key
batch_size: 30000 # Adjust as needed
target:
type: "mysql"
db_url: "mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db"
enable_ssl: true # Enable or disable SSL connections
ssl_ca: "/transflux/certificates/mysql_certs/ca.pem"
ssl_cert: "/transflux/certificates/mysql_certs/client-cert.pem"
ssl_key: "/transflux/certificates/mysql_certs/client-key.pem"
configdb:
type: "mysql"
db_url: "mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db"
enable_ssl: true # Enable or disable SSL connections
ssl_ca: "/transflux/certificates/mysql_certs/ca.pem"
ssl_cert: "/transflux/certificates/mysql_certs/client-cert.pem"
ssl_key: "/transflux/certificates/mysql_certs/client-key.pem"
transformation:
schedule_interval: "*/15 * * * *"
Source Configuration
This section defines the MongoDB source settings for data extraction.
type
: Specifies the data source type.
Example:"mongodb"
indicates that MongoDB is the source.connection_string
:
The connection string for MongoDB. It includes the following components:username
andpassword
for authentication.MongoDB host and port.
Optional parameters like
authSource
,tls
, andtlsAllowInvalidHostnames
.
Example (non-replica):
connection_string: "mongodb://root:Expertflow123@mongo-mongodb.ef-external.svc:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
Example (replica set support):
connection_string: "mongodb://root:Expertflow123@mongo-mongodb-0.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-1.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-2.mongo-mongodb-headless.ef-external.svc.cluster.local:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
replica_set_enabled
:
Indicates if the replica set is enabled.
Example:false
replica_set
:
Specifies the name of the replica set if enabled.
Example:"expertflow"
read_preference
:
Defines the read preference for MongoDB.
Example:"secondaryPreferred"
allows reading from secondary replicas.queries
:
A dictionary containing query configurations for different pipelines.forms
: Configurations for the teams pipeline.database
: Name of the MongoDB database from where teams data is being extracted. Example:"conversation-manager_db"
.collection_name
: Name of the MongoDB collection. Example:"CustomerTopicEvents"
.filter
: Query filter applied to fetch data. Example:{"cimEvent.data.body.type": "FORM_DATA"}
(to fetch all forms data records).replication_key
: Field used to track updates. Example:"recordCreationTime"
.transformation
: Transformation mapping function name. Example:"transform_forms_data"
.num_batches
: Number of data batches. Example:50
.query_keys
: Reserved for gold queries ( for loading data in gold table if needed ). Mapping query with keyforms_gold
would be fetched from query registry.
survey_distributions
: Configurations for the team members pipeline.database
: Name of the MongoDB database from where teams data is being extracted. Example:"surveydb"
.collection_name
: Name of the MongoDB collection. Example:"surveydistributions"
.filter
: Query filter applied to fetch data. Example:{}
(to fetch all records).replication_key
: Field used to track updates. Example:"updatedAt"
.transformation
: Transformation mapping function name. Example:"transform_survey_distributions"
.num_batches
: Number of data batches. Example:50
.query_keys
: Reserved for gold queries ( for loading data in gold table if needed ).
TLS/SSL Configuration:
Enables secure communication with MongoDB.tls
: Set totrue
to enable TLS. Example:true
.tls_ca_file
: Path to the CA certificate file.
Example:"/transflux/certificates/mongo_certs/mongodb-ca-cert"
.tls_cert_key_file
: Path to the client certificate and private key file.
Example:"/transflux/certificates/mongo_certs/client-pem"
.
Batch Size
batch_size
:
Number of records processed per batch.
Example:30000
.
Target Configuration
This section defines the target MySQL database settings for data loading.
type
: Specifies the target database type.
Example:"mysql"
.db_url
: Connection string for the target MySQL database.
Format:"mysql+pymysql://<username>:<password>@<host>:<port>/<database>"
.
Example:"mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db"
.enable_ssl
: Enables SSL communication with the MySQL database.
Example:true
.SSL Configuration:
ssl_ca
: Path to the CA certificate. Example:"/transflux/certificates/mysql_certs/ca.pem"
.ssl_cert
: Path to the client certificate. Example:"/transflux/certificates/mysql_certs/client-cert.pem"
.ssl_key
: Path to the client private key. Example:"/transflux/certificates/mysql_certs/client-key.pem"
.
Config Database
The configuration database (configdb
) stores metadata and operational settings for airflow.
Fields are identical to the target configuration.
Schedule Interval
schedule_interval
:
Cron expression defining the pipeline's schedule in Airflow.
Example:"*/15 * * * *"
(runs every 15 minutes).