greendev
Loading

Using Python to code SQL

My approach to SQL is to chunk everything down. Long convoluted scripts of several joins, multiple CTEs and subqueries make it harder for colleagues to update and edit. This results in technical debt. Additionally, as a Data Engineer I do not like or use the function DISTINCT. It is slow and lacks the same control as row_number(), however analysts and developers can be lazy and use distinct everywhere but when it comes to productionising code Server response is laboured with DISTINCT.

The following SQL was generated with Python. It works on the basis that Python runs a query against the table I have just created (which will always have a number suffix) and creates the following: Drop table… Select into… From… Subquery with row_number()...

drop table if exists rhe_debt.dbo.tmp_t_cerberus_debt_port_tick_assign_4
;
select    a.analysis_date
,         a.accountFk
,         a.customerFk
,         a.account_number
,         a.cus_name
,         a.forename
,         a.surname

 
into      rhe_debt.dbo.tmp_t_cerberus_debt_port_tick_assign_4
 
from
(
 
select   a.*
,        row_number() over (partition by a.accountfk order by a.from_date desc) rn
/* Add join here */
 
from     rhe_debt.dbo.tmp_t_cerberus_debt_port_tick_assign_3 a
) a
where a.rn = 1

The point of this is to use this staging table to join onto another table that will be inside a subquery. The usage of the subquery in conjunction with the row_number is to remove duplicates of the join. It gives me far more control over how I want to partition and order the data. By updating the table that I have just completed with the changed number suffix (see cell variable below), I am able to chain together stages very quickly, with very specific joins. I used this process to build a debt placement system in the space of three weeks, with testing always coming back positive.

from sqlalchemy import create_engine
import pyodbc
import urllib

#Connect to Server and Database
server = 'xxxxxxxxx'
database = 'xxxxxxxxx'
UID = 'xxxxxxxxxx'
PWD = 'xxxxxxxxxx'


params = urllib.parse.quote_plus("".join(['DRIVER={SQL Server Native Client 11.0};SERVER=',
                                 server,';DATABASE=',database,';UID=',UID,';PWD=',PWD,';']))

engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
                                    
cell = "dbo.tmp_t_cerberus_debt_port_tick_assign_3"
            
import pandas as pd

query = 'select * from ' + cell + ' where 1=0' 

df = pd.read_sql(query,engine)

s = [char for char in cell]
for i,val in reversed(list(enumerate(s))):
    try:
        if s[i].isdigit() and s[i-1].isdigit():
            t =  s[i - 1] + s[i]
        elif (s[i].isdigit()) and (not s[i+1].isdigit()):
            t = s[i]
    except IndexError:
        t = s[i]
        continue

x = str(t)
z = int(t)+1
y = str(z)

texta = ''.join(["drop table if exists " , cell[:-len(y)],y,'\n','\t\t\t',";",'\n'])

texta += ''.join(['\t\t\t','select '])

for i,key in enumerate(df.keys()):
    
    if i == 0:
        texta += ''.join([ ' \t',"a.", key,'\n'])
    else:    
        texta += ''.join(['\t\t\t', ',' , '\t\t\t', "a.", key,'\n'])
        
texta += ''.join(['\n','\t\t\t',"into" ,'\t\t', cell[:-len(y)], y,'\n\n'])

texta += ''.join(['\t\t\t',"from",'\n\n\t\t\t','(','\n\n\t\t\t','select','\t\t','a.*'])

texta += ''.join(['\n\t\t\t',',','\t\t\trow_number() over (partition by a.accountfk order by a.from_date desc) rn '])

texta += ''.join(['\n\n\t\t\tfrom\t\t',cell,' a'])

texta += ''.join(['\n\t\t\t) a\n\t\t\twhere a.rn = 1'])

import os

text_file = open("select_statement.txt", "w")
n = text_file.write(texta)
text_file.close()

os.system("notepad.exe " + "select_statement.txt")