Skip to main content
Skip table of contents

Messages Data Pipeline

Messages Data Pipeline is responsible for managing and processing all the 'messages' data from CX. It extracts data from the source (MongoDB), applies necessary transformations to align with schema requirements, and loads the data into the target database (MySQL, MSSQL).

Key tasks include

  • Validating data fields for accuracy and ensuring required fields are present.

  • Mapping data structure fields to the target schema.

  • Handling data updates and upserts to maintain data integrity and avoid data duplication.

This data pipeline is built on an ELT approach (Extract-> Load->Transform). In this approach, raw data is extracted from the source, and without any flattening or transformation, the data is loaded into the target database. This data is called the Bronze Layer data.

For the ‘Transformation’ part, instead of using generic transformation scripts, we have implemented Data Build Tool (dbt) as the modern data transformation framework. This transformation layer is responsible for creating the silver layer data tables for each message type to be used for reporting.

Schema

The schema for tables along with description of each field in each data layer is as follows:

Configurations (For Bronze and Silver Level Data)

Configurations for the Messages Data Pipelines are provided in a yaml format to ensure flexibility and adaptability. These configurations are designed for normal and ideal use cases and are advised to be used as-is to achieve optimal results.

CODE
source:
  type: "mongodb"
  connection_string: "mongodb://root:Expertflow123@192.168.2.202:31320/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"

  replica_set_enabled: false # get from global configs  ${MONGODB_REPLICASET_ENABLED}
  replica_set: "expertflow"  # # get from global configs    MONGODB_REPLICASET
  read_preference : "secondaryPreferred"  # get from global configs   MONGODB_READ_PREFERENCE
  queries:
    messages_bronze:
      database: "conversation-manager_db"
      collection_name: "ConversationActivities"
      filter: { "activity.type": "MESSAGE" }
      replication_key: "timestamp"
      transformation: "transform_messages_data"
      num_batches: 50
      query_keys:
  # TLS/SSL Configuration
  tls: true  # Set to false if you don't want to use TLS
  tls_ca_file: "/transflux/certificates/mongo_certs/mongodb-ca-cert"
  tls_cert_key_file: "/transflux/certificates/mongo_certs/client-pem"  # Includes both client certificate and private key

batch_size: 10000 # Adjust as needed

target:
  type: "mysql"
  db_url: "mysql+pymysql://<your-db-username>:<password>@<host>:<port>/<mysql-db-name>"
  #db_url: "mssql+pyodbc://<your-db-username>:<password>@<host>:<port>/<mssql-db-name>?<driver_name>"  
  enable_ssl: false  # Enable or disable SSL connections
  ssl_ca: "/transflux/certificates/mysql_certs/ca.pem"
  ssl_cert: "/transflux/certificates/mysql_certs/client-cert.pem"
  ssl_key: "/transflux/certificates/mysql_certs/client-key.pem"

configdb:
  type: "mysql"
  db_url: "mysql+pymysql://<your-db-username>:<password>@<host>:<port>/<mysql-db-name>"
  #db_url: "mssql+pyodbc://<your-db-username>:<password>@<host>:<port>/<mssql-db-name>?<driver_name>"
  enable_ssl: false
  ssl_ca: "/transflux/certificates/mysql_certs/ca.pem"
  ssl_cert: "/transflux/certificates/mysql_certs/client-cert.pem"
  ssl_key: "/transflux/certificates/mysql_certs/client-key.pem"

transformation:
schedule_interval: "*/15 * * * *"
time_offset: "0"
interval_minutes: 0
dbt_model: 
  - agent_messages_silver
  - customer_messages_silver
  - bot_messages_silver
  - action_messages_silver

Source Configuration

This section defines the MongoDB source settings for data extraction.

  • type: Specifies the data source type.
    Example: "mongodb" indicates that MongoDB is the source.

  • connection_string:
    The connection string for MongoDB. It includes the following components:

    • username and password for authentication.

    • MongoDB host and port.

    • Optional parameters like authSource, tls, and tlsAllowInvalidHostnames.

    Example (non-replica):

    CODE
    connection_string: "mongodb://root:Expertflow123@mongo-mongodb.ef-external.svc:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"

    Example (replica set support):

    CODE
    connection_string: "mongodb://root:Expertflow123@mongo-mongodb-0.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-1.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-2.mongo-mongodb-headless.ef-external.svc.cluster.local:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
  • replica_set_enabled:
    Indicates if the replica set is enabled.
    Example: false

  • replica_set:
    Specifies the name of the replica set if enabled.
    Example: "expertflow"

  • read_preference:
    Defines the read preference for MongoDB.
    Example: "secondaryPreferred" allows reading from secondary replicas.

  • queries:
    A dictionary containing query configurations for different pipelines. These queries would be different for each of the data pipelines. The respective queries for each pipeline is given in each of their respective document.

    • your_target_table_name: Configurations for the messages data pipeline.

      • database: Name of the MongoDB database from where the activities data is being extracted.

      • collection_name: Name of the MongoDB collection.

      • filter: Query filter applied to fetch data.

      • replication_key: Field used to track updates.

      • transformation: Transformation function name.

      • num_batches: Number of data batches.

      • query_keys: Reserved for gold queries ( for loading data in the gold table if needed ).

  • TLS/SSL Configuration:
    Enables secure communication with MongoDB.

    • tls: Set to true to enable TLS. Example: true.

    • tls_ca_file: Path to the CA certificate file.
      Example: "/transflux/certificates/mongo_certs/mongodb-ca-cert".

    • tls_cert_key_file: Path to the client certificate and private key file.
      Example: "/transflux/certificates/mongo_certs/client-pem".

Batch Size

  • batch_size:
    Number of records processed per batch.
    Example: 10000.

Target Configuration

This section defines the target database settings for data loading. The target database can be "mysql" or "mssql", depending upon your use case. Configurations would be different for both.

For MYSQL:

  • type: Specifies the target database type.
    Example: "mysql".

  • db_url: Connection string for the target MySQL database.
    Format: "mysql+pymysql://<username>:<password>@<host>:<port>/<database>".
    Example: "mysql+pymysql://user:Expertflow123@192.168.2.202:3306/messages_db".

  • enable_ssl: Enables SSL communication with the MySQL database. It can be true or false, depending on your database setup.
    Example: true.

  • SSL Configuration:

    • ssl_ca: Path to the CA certificate. Example: "/transflux/certificates/mysql_certs/ca.pem".

    • ssl_cert: Path to the client certificate. Example: "/transflux/certificates/mysql_certs/client-cert.pem".

    • ssl_key: Path to the client private key. Example: "/transflux/certificates/mysql_certs/client-key.pem".

For MSSQL:

  • type: Specifies the target database type.
    Example: "mssql".

  • db_url: Connection string for the target MSSQL database.
    Format: "mssql+pyodbc://<your-db-username>:<password>@<host>:<port>/<mssql-db-name>?<driver_name>".
    Example: "mssql+pyodbc://sa:Expertflow464@192.168.1.77:1433/messages_db?driver=ODBC+Driver+17+for+SQL+Server"

  • enable_ssl: Enables SSL communication with the MSSQL database. It can be true or false, depending on your database setup.
    Example: true.

  • SSL Configuration:

    • ssl_ca: Path to the CA certificate. Example: "/transflux/certificates/mssql_certs/ca.pem".

    • ssl_cert: Path to the client certificate. Example: "/transflux/certificates/mssql_certs/client-cert.pem".

    • ssl_key: Path to the client private key. Example: "/transflux/certificates/mssql_certs/client-key.pem".

Config Database

The configuration database (configdb) stores metadata and operational settings for the Data Platform.

  • Fields are identical to the target configuration.

Schedule Interval

  • schedule_interval:
    Cron expression defining the pipeline's schedule in the Data Platform.
    Example: "*/15 * * * *" (runs every 15 minutes).

DBT Models

  • dbt_model: This holds a list of dbt models that need to be run to create dedicated silver layer tables. This will remain unchanged unless some new DBT models are added to the data platform.

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.