Activities Data Pipeline
The Activities Data Pipeline is responsible for managing and processing all the conversation activities related data from CX. It extracts data from sources (MongoDB) applies necessary transformations to align with schema requirements, and loads the data into target database (MySQL). Key tasks include:
Validating data fields for accuracy and ensuring required fields are present.
Mapping data structure fields to the target schema.
Handling data updates and upserts to maintain data integrity and avoid data duplication.
The Activities Data Pipeline consists of two parallel pipelines, each responsible for processing a specific subset of the data:
These pipelines operate independently but are orchestrated together for cohesive data handling.
Configurations
Configurations for Activities Data Pipelines are provided in a yaml
format to ensure flexibility and adaptability. These configurations are designed for normal and ideal use cases and are advised to be used as-is to achieve optimal results.
source:
type: "mongodb"
connection_string: "mongodb://root:Expertflow123@mongo-mongodb.ef-external.svc:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
replica_set_enabled: false # get from global configs ${MONGODB_REPLICASET_ENABLED}
replica_set: "expertflow" # # get from global configs MONGODB_REPLICASET
read_preference : "secondaryPreferred" # get from global configs MONGODB_READ_PREFERENCE
queries:
voice_activities:
database: "conversation-manager_db"
collection_name: "ConversationActivities"
filter: {"activity.name": "VOICE_ACTIVITY"}
replication_key: "recordCreationTime"
transformation: "transform_voice_activities_data"
num_batches: 50
query_keys:
voice_connector_activities:
database: "conversation-manager_db"
collection_name: "ConversationActivities"
filter: {"activity.eventEmitter.senderName": "CX-Voice-Connector"}
replication_key: "recordCreationTime"
transformation: "transform_voice_connector_data"
num_batches: 50
query_keys:
# TLS/SSL Configuration
tls: true # Set to false if you don't want to use TLS
tls_ca_file: "/transflux/config/certificates/mongo_certs/mongodb-ca-cert"
tls_cert_key_file: "/transflux/config/certificates/mongo_certs/client-pem" # Includes both client certificate and private key
batch_size: 30000 # Adjust as needed
target:
type: "mysql"
db_url: "mysql+pymysql://<your-db-username>:<password>@<host>:<port>/<mysql-db-name>"
#db_url: "mssql+pyodbc://<your-db-username>:<password>@<host>:<port>/<mssql-db-name>?<driver_name>"
enable_ssl: false # Enable or disable SSL connections
ssl_ca: "/transflux/config/certificates/mysql_certs/ca.pem"
ssl_cert: "/transflux/config/certificates/mysql_certs/client-cert.pem"
ssl_key: "/transflux/config/certificates/mysql_certs/client-key.pem"
configdb:
type: "mysql"
db_url: "mysql+pymysql://<your-db-username>:<password>@<host>:<port>/<mysql-db-name>"
enable_ssl: false
ssl_ca: "/transflux/config/certificates/mysql_certs/ca.pem"
ssl_cert: "/transflux/config/certificates/mysql_certs/client-cert.pem"
ssl_key: "/transflux/config/certificates/mysql_certs/client-key.pem"
transformation:
schedule_interval: "*/45 * * * *"
Source Configuration
This section defines the MongoDB source settings for data extraction.
type
: Specifies the data source type.
Example:"mongodb"
indicates that MongoDB is the source.connection_string
:
The connection string for MongoDB. It includes the following components:username
andpassword
for authentication.MongoDB host and port.
Optional parameters like
authSource
,tls
, andtlsAllowInvalidHostnames
.
Example (non-replica):
CODEconnection_string: "mongodb://root:Expertflow123@mongo-mongodb.ef-external.svc:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
Example (replica set support):
CODEconnection_string: "mongodb://root:Expertflow123@mongo-mongodb-0.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-1.mongo-mongodb-headless.ef-external.svc.cluster.local:27017,mongo-mongodb-2.mongo-mongodb-headless.ef-external.svc.cluster.local:27017/?authSource=admin&tls=true&tlsAllowInvalidHostnames=true"
replica_set_enabled
:
Indicates if the replica set is enabled.
Example:false
replica_set
:
Specifies the name of the replica set if enabled.
Example:"expertflow"
read_preference
:
Defines the read preference for MongoDB.
Example:"secondaryPreferred"
allows reading from secondary replicas.queries
:
A dictionary containing query configurations for different pipelines. These queries would be different for each of the data pipelines. The respective queries for each pipeline is given in each of their respective document.your_target_table_name
: Configurations for the voice activities pipeline.database
: Name of the MongoDB database from where activities data is being extracted.collection_name
: Name of the MongoDB collection.filter
: Query filter applied to fetch data.replication_key
: Field used to track updates.transformation
: Transformation function name.num_batches
: Number of data batches.query_keys
: Reserved for gold queries ( for loading data in gold table if needed ).
TLS/SSL Configuration:
Enables secure communication with MongoDB.tls
: Set totrue
to enable TLS. Example:true
.tls_ca_file
: Path to the CA certificate file.
Example:"/transflux/certificates/mongo_certs/mongodb-ca-cert"
.tls_cert_key_file
: Path to the client certificate and private key file.
Example:"/transflux/certificates/mongo_certs/client-pem"
.
Batch Size
batch_size
:
Number of records processed per batch.
Example:30000
.
Target Configuration
This section defines the target database settings for data loading. The target database can be "mysql"
or "mssql"
depending upon your use case. Configurations would be different for both.
For MYSQL:
type
: Specifies the target database type.
Example:"mysql"
.db_url
: Connection string for the target MySQL database.
Format:"mysql+pymysql://<username>:<password>@<host>:<port>/<database>"
.
Example:"mysql+pymysql://user:Expertflow123@192.168.2.202:3306/campaigns_db"
.enable_ssl
: Enables SSL communication with the MySQL database. Can be true or false, depending on your database setup.
Example:true
.SSL Configuration:
ssl_ca
: Path to the CA certificate. Example:"/transflux/certificates/mysql_certs/ca.pem"
.ssl_cert
: Path to the client certificate. Example:"/transflux/certificates/mysql_certs/client-cert.pem"
.ssl_key
: Path to the client private key. Example:"/transflux/certificates/mysql_certs/client-key.pem"
.
For MSSQL:
type
: Specifies the target database type.
Example:"mssql"
.db_url
: Connection string for the target MSSQL database.
Format:"mssql+pyodbc://<your-db-username>:<password>@<host>:<port>/<mssql-db-name>?<driver_name>"
.
Example:"mssql+pyodbc://sa:Expertflow464@192.168.1.77:1433/testing_ali?driver=ODBC+Driver+17+for+SQL+Server"
enable_ssl
: Enables SSL communication with the MSSQL database. Can be true or false, depending on your database setup.
Example:true
.SSL Configuration:
ssl_ca
: Path to the CA certificate. Example:"/transflux/certificates/mssql_certs/ca.pem"
.ssl_cert
: Path to the client certificate. Example:"/transflux/certificates/mssql_certs/client-cert.pem"
.ssl_key
: Path to the client private key. Example:"/transflux/certificates/mssql_certs/client-key.pem"
.
Config Database
The configuration database (configdb
) stores metadata and operational settings for airflow.
Fields are identical to the target configuration.
Schedule Interval
schedule_interval
:
Cron expression defining the pipeline's schedule in Airflow.
Example:"*/45 * * * *"
(runs every 45 minutes).