Automating Kyligence Index Recommendation Feature After Analysing Pushdown Queries

Extending Kyligence Recommendation System for Ease of Use

Introduction

In this article we will discuss existing Kyligence feature of AI Augmented Recommendation System by analyzing query patterns and making suggestions for further model tuning by adding/altering indexes inside a model for better cost optimized query execution, reducing latency and hitting appropriate index by it’s intelligent query router.

Recommendation system is also configurable for significant, heavy-weight query user and less priority use cases accordingly.

After a brief description of Recommendation system, we will explain existing gap for making this completely automated with no human intervention.

Finally, we will describe supplementing utility programs to bridge this gap and making Kyligence a step forward towards more user friendliness and complete automated handling of new queries starting from model creation and index tuning both, followed by data loading into models.

Kyligence Recommendation System

Existing system is based on query pattern analysis. Patterns are extracted from query history. Kyligence’s existing supported concepts of computed-column, dimension, measure and index lead to following types of patterns –

  • Computed-column pattern
  • Dimension pattern
  • Measure pattern and
  • Index pattern

While index pattern is used for manipulating indices, other patterns are used for adding computed-column, dimensions, measures respectively. To take advantage of Recommendation system for avoiding tedious manual analysis of SQL queries, users need to turn on this following configuration.


Once this is turned on, users will see recommendation dashboard on Kyligence console home page as shown below, including total number of recommendations made, how many are already accepted by users, how many are still pending etc.


From users’ perspective, patterns are transparent; only recommendations made based on pattern analysis are visible to users. Broadly there are 2 types of index recommendations either to add new indices for better serving the queries or delete unnecessary, least used indices from models for optimizing model build cost.

Kyligence engine has built-in rules for identifying queries which have further scope for improvement. Like queries not hitting exact matching index group. Detailed description about internal principles of recommendation engine are available inside Kyligence documents on https://docs.kyligence.io/books/cloud/v4.5/en/Designers Guide/acceleration/basic_concept_actions.en.html

Existing Gap

Queries, which don’t find a matching model from the existing list of models, currently escape from Kyligence Recommendation engine and no recommendation is made for those queries. But Kyligence admin users can still directly import SQL either by clicking on “Advanced Mode” on recommendation dashboard (as shown above) or clicking on “Import SQL”on Model page of the console. Then Kyligence provides options to user for either creating a new model or adding index on the most appropriate existing model.

Though we have plans to enhance this process completely automated, currently we have a small gap existing here and I will describe how to bridge this gap in following sections of this article.

Though there are existing options to import SQL for model creation, but think of a situation when front end BI users submit a list of 100 or more queries to Kyligence administrator as all those queries are going to pushdown taking much longer times to execute and not really leveraging Kyligence any benefit. 

Administrator’s job become painful considering number of queries and their complexities.

Purpose of this article is to help Kyligence admin users to avoid this painful task. Also, to empower front end BI users to directly handle this situation amicably without bothering the administrator at all.

Bridging the Gap

Kyligence offers a very extensive suite of RESTful APIs for developers to integrate functionalities into their downstream applications. Users can get in-depth descriptions of these APIs including how to use them on Kyligence Developer’s Guide document pages.

Utilizing a few of these existing available APIs, we explain here how we can bridge this gap and make our platform completely automated when BI users introduce a large number of new queries for their existing or new reports.

To create new models for these large number of new queries, we can use our Query History API and very powerful Model Optimization API to generate model(s) from a list of SQL queries.

Implementation of this functionality

A simple workflow to implement following tasks are described in the diagram below -
  • identifying list of SQL queries not hitting indices and going to object storage via Query History API
  • prepare a list of valid queries going to pushdown for creating appropriate model
  • finally submit this list of queries to Kyligence engine via Model Optimization API

Actual program (written in python, however can be written in any other language of choice like Java script etc.) implementing this workflow described above, is attached in the Appendix section of this article.

This implementation is done via a set of configurable parameters for better portability of this utility. Those are Kyligence engine host name or IP address, port number for establishing connection, API syntax (just in case it changes in later versions, users can change it in configuration file without making any change in actual program) and finally configurable number of hours for downloading query history to analyze and identify new valid queries not hitting models and going to object storage.

Purpose of this number of hours parameter is for avoiding same pushdown queries multiple times for model creation purpose.

For example, we configure this value as 24 hours. Then we can schedule this program to run daily at a fixed time via cron or any other scheduler. Users can implement a system of detecting pushdown queries for previous day and submit them for model creation. And they can repeat this process every day.

This is just an example of simple implementation, which can definitely be improved adding further exception handlers and other robustness.

For illustration purpose, I am providing details below explaining functionality of this program. Following is query history analysis identifying pushdown queries.


Execution of the utility program and it’s output.

Once data loaded onto created models, same queries (used for model creation) are no longer answered by object storage, but hit appropriate indices of newly created model(s).
Now model(s) are created automatically. Still a task for administrator is remaining. And that is loading data into newly created models.

For complete automation of this use case (adding large number of new queries by front-end BI users), we have used Model Build API to load data into new models (also existing ones).

For now, as a prototype program of automating data load onto models, we implemented the workflow as described below.


Again, this is just the simplest implementation and there are several scopes for improving the same. Actual script for this implementation is provided in Appendix section of this document.

In this script, we are collecting list of all models via Get Model List API followed by checking for any running job using Get Job List API and finally submitting model build jobs sequentially in a synchronous manner. Users can always modify this implementation according to exact requirement for their environment.

Standard output of this job submission script is provided below.

Sequential execution of these data loading jobs are explained in the screen shot below.



Kyligence certainly has multi-tasking and parallelism built-in it’s architecture. Users can always change this implementation logic to submit jobs simultaneously. This is just an example suitable for a small-scale Spark cluster.

Summary

Purpose of this blog article is to help Kyligence admin users to avoid rigorous tasks of analyzing 100s of SQL queries created by BI developers and then creating models suitable for those and designing appropriate indices manually. These utility scripts can be added via cron or other scheduler to run at a user defined frequency, like daily or 2-times daily etc. according to users’ requirement.

Existing Kyligence AI Augmentation and Recommendation engine’s functions can be extended further via these scripts utilizing Kyligence RESTful APIs for complete automation starting from
  • detecting new pushdown queries
  • creating new models for those including appropriate indices
  • and finally loading data into those models
With this extension, Kyligence will offer a completely automated platform with self-handling of new queries, building correct models and indices for them and finally loading data into those models without any human intervention. Idea behind writing this blog is also to provide information to Kyligence users how easily they can implement their down-stream applications with enriched repository of secured and efficient RESTful API extensions.


Appendix

Automated model creation script

#!/usr/bin/python

###### Revision 1 : This script will detect pushdown queries from query history #############
###### Revision 1 : Then will send create model API call with list of push down queries #########

###### Revision 2 : Added no.of Hrs as configurable parameter; Date: 20th December 2022 #############

import requests
import json
import ConfigParser
import time
config = ConfigParser.ConfigParser()
config.readfp(open(r'cfg_model_create.ini'))
host = config.get('connection', 'host')
port = config.get('connection', 'port')
project = config.get('connection', 'project')
qry_hist_api = config.get('api', 'qry_hist_api')
model_create_api = config.get('api', 'model_create_api')
#model_validate_api = config.get('api', 'model_validate_api')
numberOfHours = config.get('api', 'numHours')
accept = config.get('header', 'Accept')
accept_language = config.get('header', 'Accept-Language')
content_type = config.get('header', 'Content-Type')
auth = config.get('header', 'Authorization')
kylin_headers = {'Accept': str(accept), 'Accept-Language': str(accept_language), 'Authorization': str(auth),
'Content-Type': str(content_type)}

def pushDownQueries():

current_time_ms = int( time.time() * 1000 )
print str(current_time_ms)
hours = int(numberOfHours)
print ("# of hours: " + str(hours))
milliseconds = (hours*3600000)
print ("milliseconds: " + str(milliseconds))
one_day_before_ms = current_time_ms - milliseconds
print (str(one_day_before_ms))

QRY_HIST_URL = host + ":" + port + "/" + qry_hist_api + "?project=" + project +
"&page_size=1000&limit=1000&start_time_from=" + str(one_day_before_ms) + "&start_time_to=" +
str(current_time_ms)
print ("Sending Query History URL ==> " + QRY_HIST_URL + "\n")
qry_hist_response = requests.get(QRY_HIST_URL, headers = kylin_headers)
#print (str(qry_hist_response))
#print "Response received for query history \n"
#print "################################# \n"
#print "Number of queries in history: " + str(qry_hist_response.json()['data']['size'])
#print "################################# \n"
queries = []
queries = qry_hist_response.json()['data']['query_histories']
qryCount = len(queries)
print ("QryCount: " + str(qryCount))
if qry_hist_response.status_code != 200 :
print ('Error : ' + str(qry_hist_response.status_code))
exit()
else :
print ('API Response Status Code : ' + str(qry_hist_response.status_code) + ' Status : SUCCESS')
queries = []
queries = qry_hist_response.json()['data']['query_histories']
sqls = []
sqlCounter = 0
for query in queries:
#print (str(query['query_status']) + "," + str(query['index_hit']) + "," + str(query['engine_type']))
if ((str(query['query_status']) == 'SUCCEEDED') and (str(query['index_hit']) == 'False') and
(str(query['engine_type']) == 'HIVE')):
sqls.append (str(query['sql_pattern']))
sqlCounter += 1
print ("################################# \n")
print ("Out of total: " + str(qryCount) + " queries, # of queries answered by pushdown: " + str(sqlCounter))
print ("################################# \n")
#print str(len(sqls))
return sqls


pdQueries = []
pdQueries = pushDownQueries()
print ("Number of Push Down Queries : " + str(len(pdQueries)))

def createModels(list_of_push_down_queries):

model_create_url = host + ":" + port + "/" + model_create_api
#print (model_create_url + "\n")
url_body = {
"project":project,
"sqls":list_of_push_down_queries
}
#print (str(url_body) + "\n")
print ("Sending Model Create URL ==> " + model_create_url + "\n")
model_create_response = requests.post(model_create_url, json = url_body, headers = kylin_headers)
#print str(model_create_response.json())
response_code = model_create_response.json()['code']
print ("API Response Received, Parsing Response")
if (str(response_code) != '000'):
print ("Model create API fails")
return False
elif (str(response_code) == '000'):
print ("Model create API succeeds")
return True
model_create = createModels(pdQueries)
if (str(model_create) == 'True'):
print ("Model Created Successfully")

Configuration file for above script (cfg_model_create.ini)

#########################################################################
########Configuration parameters for create model via API #########
###### Author: Saikat Basu, Senior Solution Architect, Kyligence #########
#########Version 2: Added no. of Hrs.config parameter
#########################################################################

[connection]
host = http://localhost
port = 7070
project = myProj1

[header]
Accept = application/vnd.apache.kylin-v4-public+json
Accept-Language = en
Content-Type = application/json;charset=utf-8
Authorization = Xxxxxxxxxxxxxxxxxxxxxxxx

[api]
model_create_api = kylin/api/models/model_suggestion
model_validate_api = kylin/api/models/model_validation
qry_hist_api = kylin/api/query/history_queries
numHours = 24

Automated data load script

#!/usr/bin/python
###### Author: Saikat Basu, Senior Solution Architect, Kyligence ##########
###### Revision 1 : Submitting Index Build Job ###########

import requests
import json
import ConfigParser
import time
from sys import exit
config = ConfigParser.ConfigParser()
config.readfp(open(r'cfg_jobsubmission.ini'))
host = config.get('connection', 'host')
port = config.get('connection', 'port')
#workspace = config.get('connection', 'workspace')
project = config.get('connection', 'project')
model_api = config.get('api', 'model_api')
job_api = config.get('api', 'job_api')
accept = config.get('header', 'Accept')
accept_language = config.get('header', 'Accept-Language')
content_type = config.get('header', 'Content-Type')
auth = config.get('header', 'Authorization')
kylin_headers = {'Accept': str(accept), 'Accept-Language': str(accept_language), 'Authorization': str(auth),
'Content-Type': str(content_type)}

def getModels():

MODEL_API_URL = host + ":" + port + "/" + model_api + "?project=" + project
model_api_response = requests.get(MODEL_API_URL, headers = kylin_headers)
if (model_api_response.status_code != 200):
print (" Get model list API call failed, exiting.....")
#exit()
print (str(model_api_response))
models = []
models = model_api_response.json()['data']['value']
model_names = []
for model in models:
model_name = str(model['name'])
model_names.append(model_name)
return model_names

myModels = getModels()
model_count = len(myModels)

print ("Number of models = " + str(model_count))

def submitJob(model_name):

BUILD_API_URL = host + ":" + port + "/" + model_api + "/" + model_name + "/segments"
url_body = {"project": project}
build_api_response = requests.post(BUILD_API_URL, json = url_body, headers = kylin_headers)
if (build_api_response.status_code != 200):
print ("Model load API call failed,exiting ....")
#exit()
print (str(build_api_response))
jobs = build_api_response.json()['data']['jobs']
#print ("Jobs ==> "+ str(len(jobs)))
if (len(jobs) > 0):
print "Job submitted successfully"
else:
print "Job submission failed"
#exit()


def getJobs():

current_status = 'done'
JOB_API_URL = host + ":" + port + "/" + job_api
job_api_response = requests.get(JOB_API_URL, headers = kylin_headers)
#if (job_api_response != 200):
# print ("Job list API call failed, exiting....")
# exit()
jobs = []
jobs = job_api_response.json()['data']['value']
#print ("Length of jobs ==> "+str(len(jobs)))
if len(jobs) > 0:
for job in jobs:
status = job['job_status']
#print ("Status ==> " + status)
if status != 'FINISHED':
current_status = 'running'
break
else:
continue 
if (current_status == 'done'):
return True
elif (current_status == 'running'):
return False
else:
print ("No job found")
return True


for myModel in myModels:
while True:
time.sleep(150)
ret = getJobs()
if ret == False:
print ("Jobs running, waiting ...")
time.sleep(120)
continue
elif ret == True:
break

 

print ("Submitting data load job for model ==> " + (str(myModel)))

submitJob(str(myModel))

Configuration file for above script (cfg_jobsubmission.ini)

#########################################################################################
########Configuration parameters for users.py, user acl json parser
########Author: Saikat Basu, Senior Solution Architect, Kyligence Inc.

#########################################################################################
######## Only change host, workspace name, project name and Authorization string ########

[connection]
host = http://localhost
port = 7070
project = myProj1

[api]
model_api = kylin/api/models
job_api = kylin/api/jobs?time_filter=0&page_size=1

[header]
Accept = application/vnd.apache.kylin-v4-public+json
Accept-Language = en
Content-Type = application/json;charset=utf-8
Authorization = Xxxxxxxxxxxxxxxxxxxxxxxx

Comments

Popular posts from this blog

Customizing an Apache Hop Docker Container

Secured Information Ingestion following Azure Entra OAuth2 from Office365