Skip to main content
Skip table of contents

Forms Data Pipeline

The Forms Data Pipeline is responsible for managing and processing forms-related data. It extracts data from sources (MongoDB) applies necessary transformations to align with schema requirements, and loads the data into target database (MySQL). Key tasks include:

  • Validating data fields for completeness and accuracy.

  • Mapping form structure fields to the target schema.

  • Handling data updates and upserts to maintain data integrity.

The Forms Data Pipeline consists of two parallel pipelines:

  • Survey distributions data Pipeline: Handles transformations specific to the Survey distributions, ensuring alignment with the schema definitions in the target. This pipeline processes data related to surveys offered to customers.

  • Forms Data Pipeline: Processes and transforms submitted form data,

  • Forms Schema Data Pipeline: Processes and transforms all of the form data (Required for QM)

This parallel processing is orchestrated in Data Platform, allowing for efficient batch processing and separate transformation logic for schema and data.

Configurations

Configurations for the Forms Data Pipeline are provided in a forms_data_pipeline_config.yaml format to ensure flexibility and adaptability. These configurations are designed for normal and ideal use cases and are advised to be used as-is to achieve optimal results.

CODE
source:
  type: "mongodb"
  connection_string: "mongodb://root:Expertflow123@mongo-mongodb.ef-external.svc:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"

  # connection string for replica support
  # connection_string: "mongodb://root:Expertflow123@mongo-mongodb-0.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-1.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-2.mongo-mongodb-headless.ef-external.svc.cluster.local:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
  
  replica_set_enabled: false
  replica_set: "expertflow"
  read_preference : "secondaryPreferred"
  queries:
    forms:
      database: "conversation-manager_db"
      collection_name: "CustomerTopicEvents"
      filter: {"cimEvent.data.body.type": "FORM_DATA"}
      replication_key: "recordCreationTime"
      transformation: "transform_forms_data"
      num_batches: 50
      query_keys:
       - "forms_gold"
    survey_distributions:
      database: "surveydb"
      collection_name: "surveydistributions"
      filter: {}
      replication_key: "updatedAt"
      transformation: "transform_survey_distributions"
      num_batches: 50
      query_keys:
    forms_schema:
      database: "adminPanel"
      collection_name: "forms"
      filter: {}
      replication_key: "updatedAt"
      transformation: "transform_forms_schema_data"
      num_batches: 50
      query_keys:
      
  # TLS/SSL Configuration
  tls: true  # Set to false if you don't want to use TLS
  tls_ca_file: "/transflux/certificates/mongo_certs/mongodb-ca-cert"
  tls_cert_key_file: "/transflux/certificates/mongo_certs/client-pem"  # Includes both client certificate and private key

batch_size: 30000 # Adjust as needed

target:
  type: "mysql"
  db_url: "mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db"
  enable_ssl: true  # Enable or disable SSL connections
  ssl_ca: "/transflux/certificates/mysql_certs/ca.pem"
  ssl_cert: "/transflux/certificates/mysql_certs/client-cert.pem"
  ssl_key: "/transflux/certificates/mysql_certs/client-key.pem"

configdb:
  type: "mysql"
  db_url: "mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db"
  enable_ssl: true  # Enable or disable SSL connections
  ssl_ca: "/transflux/certificates/mysql_certs/ca.pem"
  ssl_cert: "/transflux/certificates/mysql_certs/client-cert.pem"
  ssl_key: "/transflux/certificates/mysql_certs/client-key.pem"

transformation:
schedule_interval: "*/15 * * * *"

Source Configuration

This section defines the MongoDB source settings for data extraction.

  • type: Specifies the data source type.
    Example: "mongodb" indicates that MongoDB is the source.

  • connection_string:
    The connection string for MongoDB. It includes the following components:

    • username and password for authentication.

    • MongoDB host and port.

    • Optional parameters like authSource, tls, and tlsAllowInvalidHostnames.

    Example (non-replica):

CODE
connection_string: "mongodb://root:Expertflow123@mongo-mongodb.ef-external.svc:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"

Example (replica set support):

CODE
connection_string: "mongodb://root:Expertflow123@mongo-mongodb-0.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-1.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-2.mongo-mongodb-headless.ef-external.svc.cluster.local:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
  • replica_set_enabled:
    Indicates if the replica set is enabled.
    Example: false

  • replica_set:
    Specifies the name of the replica set if enabled.
    Example: "expertflow"

  • read_preference:
    Defines the read preference for MongoDB.
    Example: "secondaryPreferred" allows reading from secondary replicas.

  • queries:
    A dictionary containing query configurations for different pipelines.

    • forms: Configurations for the forms pipeline.

      • database: Name of the MongoDB database from where forms data is being extracted. Example: "conversation-manager_db".

      • collection_name: Name of the MongoDB collection. Example: "CustomerTopicEvents".

      • filter: Query filter applied to fetch data. Example: {"cimEvent.data.body.type": "FORM_DATA"} (to fetch all forms data records).

      • replication_key: Field used to track updates. Example: "recordCreationTime".

      • transformation: Transformation mapping function name. Example: "transform_forms_data".

      • num_batches: Number of data batches. Example: 50.

      • query_keys: Reserved for gold queries ( for loading data in gold table if needed ). Mapping query with key forms_gold would be fetched from query registry.

    • survey_distributions: Configurations for the survey distribution pipeline.

      • database: Name of the MongoDB database from where survey distribution data is being extracted. Example: "surveydb".

      • collection_name: Name of the MongoDB collection. Example: "surveydistributions".

      • filter: Query filter applied to fetch data. Example: {} (to fetch all records).

      • replication_key: Field used to track updates. Example: "updatedAt".

      • transformation: Transformation mapping function name. Example: "transform_survey_distributions".

      • num_batches: Number of data batches. Example: 50.

      • query_keys: Reserved for gold queries ( for loading data in gold table if needed ).

    • forms_schema: Configurations for the forms schema pipeline.

      • database: Name of the MongoDB database from where forms schema data is being extracted. Example: "adminPanel".

      • collection_name: Name of the MongoDB collection. Example: "forms".

      • filter: Query filter applied to fetch data. Example: {} (to fetch all records).

      • replication_key: Field used to track updates. Example: "updatedAt".

      • transformation: Transformation mapping function name. Example: "transform_forms_schema_data".

      • num_batches: Number of data batches. Example: 50.

      • query_keys: Reserved for gold queries ( for loading data in gold table if needed ).

  • TLS/SSL Configuration:
    Enables secure communication with MongoDB.

    • tls: Set to true to enable TLS. Example: true.

    • tls_ca_file: Path to the CA certificate file.
      Example: "/transflux/certificates/mongo_certs/mongodb-ca-cert".

    • tls_cert_key_file: Path to the client certificate and private key file.
      Example: "/transflux/certificates/mongo_certs/client-pem".

Batch Size

  • batch_size:
    Number of records processed per batch.
    Example: 30000.

Target Configuration

This section defines the target MySQL database settings for data loading.

  • type: Specifies the target database type.
    Example: "mysql".

  • db_url: Connection string for the target MySQL database.
    Format: "mysql+pymysql://<username>:<password>@<host>:<port>/<database>".
    Example: "mysql+pymysql://elonmusk:68i3nj7t@192.168.1.182:3306/forms_db".

  • enable_ssl: Enables SSL communication with the MySQL database.
    Example: true.

  • SSL Configuration:

    • ssl_ca: Path to the CA certificate. Example: "/transflux/certificates/mysql_certs/ca.pem".

    • ssl_cert: Path to the client certificate. Example: "/transflux/certificates/mysql_certs/client-cert.pem".

    • ssl_key: Path to the client private key. Example: "/transflux/certificates/mysql_certs/client-key.pem".

Config Database

The configuration database (configdb) stores metadata and operational settings for Data Platform.

  • Fields are identical to the target configuration.

Schedule Interval

  • schedule_interval:
    Cron expression defining the pipeline's schedule in Data Platform.
    Example: "*/15 * * * *" (runs every 15 minutes).

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.