The Forms Data Pipeline is responsible for managing and processing forms-related data. It extracts data from sources (MongoDB) applies necessary transformations to align with schema requirements, and loads the data into target database (MySQL). Key tasks include:
-
Validating data fields for completeness and accuracy.
-
Mapping form structure fields to the target schema.
-
Handling data updates and upserts to maintain data integrity.
The Forms Data Pipeline consists of two parallel pipelines:
-
Survey distributions data Pipeline: Handles transformations specific to the Survey distributions, ensuring alignment with the schema definitions in the target. This pipeline processes data related to surveys offered to customers.
-
Forms Data Pipeline: Processes and transforms form data, this is usually the submitted data of a form.
This parallel processing is orchestrated in Airflow, allowing for efficient batch processing and separate transformation logic for schema and data.
Configurations
Configurations for the Teams Data Pipeline are provided in a forms_data_pipeline_config.yaml format to ensure flexibility and adaptability. These configurations are designed for normal and ideal use cases and are advised to be used as-is to achieve optimal results.
source:
type: "mongodb"
connection_string: "mongodb://root:Expertflow123@mongo-mongodb.ef-external.svc:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
# connection string for replica support
# connection_string: "mongodb://root:Expertflow123@mongo-mongodb-0.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-1.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-2.mongo-mongodb-headless.ef-external.svc.cluster.local:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
replica_set_enabled: false
replica_set: "expertflow"
read_preference : "secondaryPreferred"
queries:
forms:
database: "conversation-manager_db"
collection_name: "CustomerTopicEvents"
filter: {"cimEvent.data.body.type": "FORM_DATA"}
replication_key: "recordCreationTime"
transformation: "transform_forms_data"
num_batches: 50
query_keys:
- "forms_gold"
survey_distributions:
database: "surveydb"
collection_name: "surveydistributions"
filter: {}
replication_key: "updatedAt"
transformation: "transform_survey_distributions"
num_batches: 50
query_keys:
# TLS/SSL Configuration
tls: true # Set to false if you don't want to use TLS
tls_ca_file: "/transflux/certificates/mongo_certs/mongodb-ca-cert"
tls_cert_key_file: "/transflux/certificates/mongo_certs/client-pem" # Includes both client certificate and private key
batch_size: 30000 # Adjust as needed
target:
type: "mysql"
db_url: "mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db"
enable_ssl: true # Enable or disable SSL connections
ssl_ca: "/transflux/certificates/mysql_certs/ca.pem"
ssl_cert: "/transflux/certificates/mysql_certs/client-cert.pem"
ssl_key: "/transflux/certificates/mysql_certs/client-key.pem"
configdb:
type: "mysql"
db_url: "mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db"
enable_ssl: true # Enable or disable SSL connections
ssl_ca: "/transflux/certificates/mysql_certs/ca.pem"
ssl_cert: "/transflux/certificates/mysql_certs/client-cert.pem"
ssl_key: "/transflux/certificates/mysql_certs/client-key.pem"
transformation:
schedule_interval: "*/15 * * * *"
Source Configuration
This section defines the MongoDB source settings for data extraction.
-
type: Specifies the data source type.
Example:"mongodb"indicates that MongoDB is the source. -
connection_string:
The connection string for MongoDB. It includes the following components:-
usernameandpasswordfor authentication. -
MongoDB host and port.
-
Optional parameters like
authSource,tls, andtlsAllowInvalidHostnames.
Example (non-replica):
-
connection_string: "mongodb://root:Expertflow123@mongo-mongodb.ef-external.svc:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
Example (replica set support):
connection_string: "mongodb://root:Expertflow123@mongo-mongodb-0.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-1.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-2.mongo-mongodb-headless.ef-external.svc.cluster.local:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
-
replica_set_enabled:
Indicates if the replica set is enabled.
Example:false -
replica_set:
Specifies the name of the replica set if enabled.
Example:"expertflow" -
read_preference:
Defines the read preference for MongoDB.
Example:"secondaryPreferred"allows reading from secondary replicas. -
queries:
A dictionary containing query configurations for different pipelines.-
forms: Configurations for the teams pipeline.-
database: Name of the MongoDB database from where teams data is being extracted. Example:"conversation-manager_db". -
collection_name: Name of the MongoDB collection. Example:"CustomerTopicEvents". -
filter: Query filter applied to fetch data. Example:{"cimEvent.data.body.type": "FORM_DATA"}(to fetch all forms data records). -
replication_key: Field used to track updates. Example:"recordCreationTime". -
transformation: Transformation mapping function name. Example:"transform_forms_data". -
num_batches: Number of data batches. Example:50. -
query_keys: Reserved for gold queries ( for loading data in gold table if needed ). Mapping query with keyforms_goldwould be fetched from query registry.
-
-
survey_distributions: Configurations for the team members pipeline.-
database: Name of the MongoDB database from where teams data is being extracted. Example:"surveydb". -
collection_name: Name of the MongoDB collection. Example:"surveydistributions". -
filter: Query filter applied to fetch data. Example:{}(to fetch all records). -
replication_key: Field used to track updates. Example:"updatedAt". -
transformation: Transformation mapping function name. Example:"transform_survey_distributions". -
num_batches: Number of data batches. Example:50. -
query_keys: Reserved for gold queries ( for loading data in gold table if needed ).
-
-
-
TLS/SSL Configuration:
Enables secure communication with MongoDB.-
tls: Set totrueto enable TLS. Example:true. -
tls_ca_file: Path to the CA certificate file.
Example:"/transflux/certificates/mongo_certs/mongodb-ca-cert". -
tls_cert_key_file: Path to the client certificate and private key file.
Example:"/transflux/certificates/mongo_certs/client-pem".
-
Batch Size
-
batch_size:
Number of records processed per batch.
Example:30000.
Target Configuration
This section defines the target MySQL database settings for data loading.
-
type: Specifies the target database type.
Example:"mysql". -
db_url: Connection string for the target MySQL database.
Format:"mysql+pymysql://<username>:<password>@<host>:<port>/<database>".
Example:"mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db". -
enable_ssl: Enables SSL communication with the MySQL database.
Example:true. -
SSL Configuration:
-
ssl_ca: Path to the CA certificate. Example:"/transflux/certificates/mysql_certs/ca.pem". -
ssl_cert: Path to the client certificate. Example:"/transflux/certificates/mysql_certs/client-cert.pem". -
ssl_key: Path to the client private key. Example:"/transflux/certificates/mysql_certs/client-key.pem".
-
Config Database
The configuration database (configdb) stores metadata and operational settings for airflow.
-
Fields are identical to the target configuration.
Schedule Interval
-
schedule_interval:
Cron expression defining the pipeline's schedule in Airflow.
Example:"*/15 * * * *"(runs every 15 minutes).