Skip to main content

Executing Workflows Using Python

Overview:

This guide outlines the steps to allow users to execute a workflow using a provided host URL, headers, and Workflow ID from the Command Line Interface. It triggers a workflow/rule and retrieves the execution status and result. The result can be saved in an output JSON file.

Steps:

1. Pre-Requisites:

Note

For scripts, refer to the Sample Code Snippets section below.

Before initiating the workflow automation, ensure the following pre-requisites are met:

  • Python Environment: Ensure that Python is installed on the system where the automation will be performed.
  • iceDQ Configuration: Confirm that the iceDQ is set up and configured correctly.
  • You need to have the runWorkflow.py file in any folder you choose (refer script).
  • You need to have a lib folder that includes functions: rule.py. The lib folder must be situated within the folder containing the runWorkflow.py file.

Lib Folder Python

  • You need to have the config.json file in the same folder as runWorkflow.py file.
  • You may have a folder to house the execution logs. The path of this folder will then have to be specified as the -lf for Log Folder argument during execution.
  • You may have a folder to house the execution details as an output. The path of this folder will need to be specified as the -of for Output folder path argument during execution.
  • You may want to specify the name of the output JSON file. The name of this file will then need to be specified as the -o for Output File Name argument during execution.
  • If you do not specify either the logsfolder info or output folder info or both, then by default logs and/or output folders will be created within the folder in which the executeWorkflow.py file is located. The name of the output JSON file will be a default output.json.

2. Pre-Setup:

Prior to executing the workflows, perform the following pre-setup tasks:

  • Get Config Details: The configuration file config.py contains a required object. You need to enter the Authorization token and Workspace-Id and values in the header payload.
  • Workflow ID: Ensure that you have the workflow_Id of the workflow you want to execute.
  • Base URL: Ensure that you have the base URL of your iceDQ instance.

3. Script Functionality Overview

This is an overview of how the scripts functionality:

  1. Script Structure:

    • Initialize values, parse options, read configuration, and set up dates.
    • Execute the workflow, log details, and handle output.
  2. Function Structure (executeWorkflow):

    • Define executeWkfl function for triggering workflow runs.
    • Handle HTTP POST requests and return responses.
  3. Function Structure (getExecutionResult):

    • Define getExecutionResultWithPolling function for polling results.
    • Accept parameters and return when execution is completed.
  4. Error Handling:

    • Manage unexpected input in the script and function parameters.
    • Use try-catch blocks for logging exceptions and handle HTTP response errors.

4. Command Line Arguments

Below are the Command Line Arguments relevant to executing Workflows:

FlagNote
-baseurlThe host URL.
-idThe ID of the rule/workflow to be executed.
-asyncAsynchronous status, either True or False.
-outnameOutput file name for saving the execution result.
-outpathOutput folder path.
-logsfolderLog folder path.

This is how you can execute an iceDQ Workflow using a Python script through the Command Line Interface. The following is the full standard format:

python run_workflow.py [-H HOSTURL] [-id WORKFLOW_ID] [-async ASYNCHRONOUS] [-o OUTPUT] [-of OUTPUT_FOLDER] [-lf LOG_FOLDER]

The following is the output you will receive if the execution is successful:

yyyy-mm-dd hh-mm-ss [INFO] Execution Rule:{"instanceId":#####}
yyyy-mm-dd hh-mm-ss [INFO] Workflow Status: running
yyyy-mm-dd hh-mm-ss [INFO] Workflow still running…
....
....
yyyy-mm-dd hh-mm-ss [INFO] Workflow Status: Success
yyyy-mm-dd hh-mm-ss [INFO] Execution Result: {response json}
yyyy-mm-dd hh-mm-ss [INFO] Execution is successful.
yyyy-mm-dd hh-mm-ss [INFO] Execution result saved to {default output path}

This is the error you may get if the authorization fails:

yyyy-mm-dd hh-mm-ss [INFO] {"code":"RequestUnauthorized","message":"Invalid bearer token for user."}

5. Post Execution To Dos

After executing the workflows, perform the following post-execution tasks:

  1. Review Logs: Analyze the script logs to identify any issues or anomalies during the execution.
  2. Result Verification: You may manually verify the test results in iceDQ (optional).
  3. Documentation: Document the output JSON file to maintain records of the improvement/deterioration of data quality.

Sample Code Snippets

rule.py

import requests
import json

# Execute Rule and get instance id
def execute_rule(hostUrl, headers, object_id):
url = f"{hostUrl}/workflowruns:trigger"

payload = json.dumps({
"objectId": object_id
})

response = requests.post(url, headers=headers, data=payload)

# return response.text
if response.status_code in [200,202]:
# print("Response code is 200 - success")
return response
else:
# print(f"Response code is {response.status_code} not 200 - failure")
return response
print(response.text)
exit()

# Get Execution Result
def get_execution_status(hostUrl, headers, id):
url = f"{hostUrl}/workflowruns/{id}"

payload = json.dumps({})

response = requests.get(url, headers=headers, data=payload)

# return response.text
if response.status_code in [200,202]:
# print("Response code is 200 - success")
return response
else:
# print(f"Response code is {response.status_code} not 200 - failure")
print(response.text)
exit()

# Get Execution Result
def get_execution_result(hostUrl, headers, object_id, id):
url = f"{hostUrl}/workflowruns/{id}/result"

payload = json.dumps({
"objectId": object_id
})

response = requests.get(url, headers=headers, data=payload)

# return response.text
if response.status_code in [200,202]:
# print("Response code is 200 - success")
return response
else:
# print(f"Response code is {response.status_code} not 200 - failure")
print(response.text)
exit()

config.py

# hostUrl='http://api.icedq.net/api/v1'
token='enter accessToken here'
Workspace_id='enter workspace id here'
headerPayload = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Workspace-Id': Workspace_id,
'Authorization': 'Bearer ' + token
}

runWorkflow.py

from lib import rule
from time import sleep
import argparse
import datetime
import config
import logging
import os
import json
import sys

# Get the full path of the script file
script_path = os.path.abspath(__file__)

# Extract the file name without the extension
script_name = os.path.splitext(os.path.basename(script_path))[0]

# Check if no arguments are provided or if "--help" or "-h" is present
if len(sys.argv) == 1 or '--help' in sys.argv or '-h' in sys.argv:
print(f"Usage: python {script_name}.py [-H HOSTURL] [-id WORKFLOW_ID] [-async ASYNCHRONOUS] [-o OUTPUT] [-of OUTPUT_FOLDER] [-lf LOG_FOLDER]")
sys.exit(0)

# Define command-line arguments
parser = argparse.ArgumentParser()

parser.add_argument("-H", "--hostUrl", required=False, help="The host URL")
# parser.add_argument("-t", "--token", required=False, help="Your access token")

# Rule id argument
parser.add_argument("-id", "--workflow_id", required=False, help="Your Rule Id")

# Asynchronous argument
parser.add_argument("-async", "--asynchronous", required=False, default=False, type=bool, help="Asynchronous Status either True or False")

# Output file path argument
parser.add_argument("-o", "--output", required=False, help="Output file name")

# Output folder path argument
parser.add_argument("-of", "--output_folder", required=False, help="Output folder path")

# Log folder path argument
parser.add_argument("-lf", "--log_folder", required=False, help="Log folder path")

args = parser.parse_args()

# Get current date and time
logsTime = datetime.datetime.now()

# Create default folders for logs and output
log_directory = args.log_folder or "default-logs"
output_directory = args.output_folder or "default-output"

os.makedirs(log_directory, exist_ok=True)
os.makedirs(output_directory, exist_ok=True)

# Generate unique log file name using a timestamp
log_file = os.path.join(log_directory, f"LogFile_{logsTime.strftime('%Y%m%d')}.log")

# Configure Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt='%d-%b-%Y %H:%M:%S')

# Create a handler to log to the file
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", datefmt='%d-%b-%Y %H:%M:%S'))
logging.getLogger().addHandler(file_handler)

# Define variables from config file
token = config.token
headers = config.headerPayload

# set a variable for command-line arguments
rule_id = args.workflow_id

# Set default paths for logs and output files
default_log_file = os.path.join(log_directory, f"DefaultLogFile_{logsTime.strftime('%Y%m%d')}.log")
default_output_file = os.path.join(output_directory, f"DefaultOutput_{logsTime.strftime('%Y%m%d')}.json")

# host_url = args.hostUrl or logging.error("Error: You must provide 'hostUrl'")
if args.hostUrl:
host_url = args.hostUrl
else:
logging.info("Error: You must provide 'hostUrl'")
sys.exit()

asynchronous_flag = args.asynchronous

if not asynchronous_flag:
execution_response = rule.execute_rule(host_url, headers, rule_id)
logging.info("Execution Rule:" + execution_response.text)
instance_id = execution_response.json()["instanceId"]
else:
execution_response = rule.execute_rule(host_url, headers, rule_id)
logging.info("Execution Rule:" + execution_response.text)
sys.exit()


# Continuously check the status is in Running, Success, or Error
while True:
status_response = rule.get_execution_result(host_url, headers, rule_id, instance_id)
sleep(5)

status = status_response.json()["status"]
logging.info("Status: " + status)
sleep(5)

if status == "running":
logging.info("Rule is still Running...")
sleep(5)

elif status == "success":
sleep(5)
logging.info("Get the status of execution:" + status_response.text)
logging.info("Execution is successful.")
break

elif status == "error":
sleep(5)
error_description = status_response.json()["workflowInfo"]["activities"][0]["instance"]["error"]
logging.info(f"Error Description: {error_description}")
logging.info("Execution failed")
sys.exit()

else:
sleep(5)
logging.info(f"Execution status is {status}. Exiting...")
logging.info("Execution Result:" + status_response.text)
break

# Check if the output argument is provided
if args.output:
json_file_path = os.path.join(output_directory, args.output)
else:
json_file_path = default_output_file

try:
with open(json_file_path, 'w') as json_file:
json.dump(status_response.json(), json_file, indent=4)
logging.info(f"Execution result saved to {json_file_path}")
except Exception as e:
logging.error(f"Error while saving execution result to JSON file: {e}")

logging.shutdown()