Skip to main content
Skip table of contents

Forms Data Pipeline

The Forms Data Pipeline is responsible for managing and processing forms-related data. It extracts data from sources (MongoDB) applies necessary transformations to align with schema requirements, and loads the data into target database (MySQL). Key tasks include:

  • Validating data fields for completeness and accuracy.

  • Mapping form structure fields to the target schema.

  • Handling data updates and upserts to maintain data integrity.

The Forms Data Pipeline consists of two parallel pipelines:

  • Survey distributions data Pipeline: Handles transformations specific to the Survey distributions, ensuring alignment with the schema definitions in the target. This pipeline processes data related to surveys offered to customers.

  • Forms Data Pipeline: Processes and transforms form data, this is usually the submitted data of a form.

This parallel processing is orchestrated in Airflow, allowing for efficient batch processing and separate transformation logic for schema and data.

Configurations

Configurations for the Teams Data Pipeline are provided in a forms_data_pipeline_config.yaml format to ensure flexibility and adaptability. These configurations are designed for normal and ideal use cases and are advised to be used as-is to achieve optimal results.

CODE
source:
  type: "mongodb"
  connection_string: "mongodb://root:Expertflow123@mongo-mongodb.ef-external.svc:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"

  # connection string for replica support
  # connection_string: "mongodb://root:Expertflow123@mongo-mongodb-0.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-1.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-2.mongo-mongodb-headless.ef-external.svc.cluster.local:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
  
  replica_set_enabled: false
  replica_set: "expertflow"
  read_preference : "secondaryPreferred"
  queries:
    forms:
      database: "conversation-manager_db"
      collection_name: "CustomerTopicEvents"
      filter: {"cimEvent.data.body.type": "FORM_DATA"}
      replication_key: "recordCreationTime"
      transformation: "transform_forms_data"
      num_batches: 50
      query_keys:
       - "forms_gold"
    survey_distributions:
      database: "surveydb"
      collection_name: "surveydistributions"
      filter: {}
      replication_key: "updatedAt"
      transformation: "transform_survey_distributions"
      num_batches: 50
      query_keys:
  # TLS/SSL Configuration
  tls: true  # Set to false if you don't want to use TLS
  tls_ca_file: "/transflux/certificates/mongo_certs/mongodb-ca-cert"
  tls_cert_key_file: "/transflux/certificates/mongo_certs/client-pem"  # Includes both client certificate and private key

batch_size: 30000 # Adjust as needed

target:
  type: "mysql"
  db_url: "mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db"
  enable_ssl: true  # Enable or disable SSL connections
  ssl_ca: "/transflux/certificates/mysql_certs/ca.pem"
  ssl_cert: "/transflux/certificates/mysql_certs/client-cert.pem"
  ssl_key: "/transflux/certificates/mysql_certs/client-key.pem"

configdb:
  type: "mysql"
  db_url: "mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db"
  enable_ssl: true  # Enable or disable SSL connections
  ssl_ca: "/transflux/certificates/mysql_certs/ca.pem"
  ssl_cert: "/transflux/certificates/mysql_certs/client-cert.pem"
  ssl_key: "/transflux/certificates/mysql_certs/client-key.pem"

transformation:
schedule_interval: "*/15 * * * *"

Source Configuration

This section defines the MongoDB source settings for data extraction.

  • type: Specifies the data source type.
    Example: "mongodb" indicates that MongoDB is the source.

  • connection_string:
    The connection string for MongoDB. It includes the following components:

    • username and password for authentication.

    • MongoDB host and port.

    • Optional parameters like authSource, tls, and tlsAllowInvalidHostnames.

    Example (non-replica):

CODE
connection_string: "mongodb://root:Expertflow123@mongo-mongodb.ef-external.svc:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"

Example (replica set support):

CODE
connection_string: "mongodb://root:Expertflow123@mongo-mongodb-0.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-1.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-2.mongo-mongodb-headless.ef-external.svc.cluster.local:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
  • replica_set_enabled:
    Indicates if the replica set is enabled.
    Example: false

  • replica_set:
    Specifies the name of the replica set if enabled.
    Example: "expertflow"

  • read_preference:
    Defines the read preference for MongoDB.
    Example: "secondaryPreferred" allows reading from secondary replicas.

  • queries:
    A dictionary containing query configurations for different pipelines.

    • forms: Configurations for the teams pipeline.

      • database: Name of the MongoDB database from where teams data is being extracted. Example: "conversation-manager_db".

      • collection_name: Name of the MongoDB collection. Example: "CustomerTopicEvents".

      • filter: Query filter applied to fetch data. Example: {"cimEvent.data.body.type": "FORM_DATA"} (to fetch all forms data records).

      • replication_key: Field used to track updates. Example: "recordCreationTime".

      • transformation: Transformation mapping function name. Example: "transform_forms_data".

      • num_batches: Number of data batches. Example: 50.

      • query_keys: Reserved for gold queries ( for loading data in gold table if needed ). Mapping query with key forms_gold would be fetched from query registry.

    • survey_distributions: Configurations for the team members pipeline.

      • database: Name of the MongoDB database from where teams data is being extracted. Example: "surveydb".

      • collection_name: Name of the MongoDB collection. Example: "surveydistributions".

      • filter: Query filter applied to fetch data. Example: {} (to fetch all records).

      • replication_key: Field used to track updates. Example: "updatedAt".

      • transformation: Transformation mapping function name. Example: "transform_survey_distributions".

      • num_batches: Number of data batches. Example: 50.

      • query_keys: Reserved for gold queries ( for loading data in gold table if needed ).

  • TLS/SSL Configuration:
    Enables secure communication with MongoDB.

    • tls: Set to true to enable TLS. Example: true.

    • tls_ca_file: Path to the CA certificate file.
      Example: "/transflux/certificates/mongo_certs/mongodb-ca-cert".

    • tls_cert_key_file: Path to the client certificate and private key file.
      Example: "/transflux/certificates/mongo_certs/client-pem".

Batch Size

  • batch_size:
    Number of records processed per batch.
    Example: 30000.

Target Configuration

This section defines the target MySQL database settings for data loading.

  • type: Specifies the target database type.
    Example: "mysql".

  • db_url: Connection string for the target MySQL database.
    Format: "mysql+pymysql://<username>:<password>@<host>:<port>/<database>".
    Example: "mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db".

  • enable_ssl: Enables SSL communication with the MySQL database.
    Example: true.

  • SSL Configuration:

    • ssl_ca: Path to the CA certificate. Example: "/transflux/certificates/mysql_certs/ca.pem".

    • ssl_cert: Path to the client certificate. Example: "/transflux/certificates/mysql_certs/client-cert.pem".

    • ssl_key: Path to the client private key. Example: "/transflux/certificates/mysql_certs/client-key.pem".

Config Database

The configuration database (configdb) stores metadata and operational settings for airflow.

  • Fields are identical to the target configuration.

Schedule Interval

  • schedule_interval:
    Cron expression defining the pipeline's schedule in Airflow.
    Example: "*/15 * * * *" (runs every 15 minutes).

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.