Python: Create an ETL with Luigi, Pandas and SQLAlchemy

Image for post
Image for post
Photo by Nicole Baster on Unsplash

What is ETL?

What is Luigi?

What is Pandas?

What is SQLAlchemy?

How we can combine Luigi, Pandas and SQLAlchemy?

Scenario:

Preparation:

$ sudo pip3 install pandas
$ sudo pip3 install sqlalchemy
$ sudo pip3 install luigi
$ sqlite3 db1
create table names (id varchar(10) primary key, first_name text, last_name text);
insert into names values('2','john','doe');
insert into names values('3','jenny','doe');
$ sqlite3 db2
create table salaries (id varchar(10) primary key, salary integer);
insert into salaries values('1',10000);
insert into salaries values('2',13000);
insert into salaries values('3',23000);
#!/usr/bin/env python3
from sqlalchemy import create_engine
import luigi
import pandas as pd

Solution:

class QueryDB1(luigi.Task):    def requires(self):
return []
def output(self):
return luigi.LocalTarget("DB1_output.csv")
def run(self):
engine = create_engine('sqlite:///db1')
results = pd.read_sql_query('SELECT * from names',engine)
f = self.output().open('w')
results.to_csv(f,encoding = 'utf-8',index=False,header=True,quoting=2)
f.close()
class QueryDB2(luigi.Task):    def requires(self):
return []
def output(self):
return luigi.LocalTarget("DB2_output.csv")
def run(self):
engine = create_engine('sqlite:///db2')
results = pd.read_sql_query('SELECT * from salaries',engine)
f = self.output().open('w')
results.to_csv(f,encoding = 'utf-8',index=False,header=True,quoting=2)
f.close()
class CreateReport(luigi.Task):    def requires(self):
return [QueryDB1(),QueryDB2()]
def output(self):
return luigi.LocalTarget("Report.csv")
def run(self):
df1 = pd.read_csv("DB1_output.csv", header = 0, encoding = 'utf-8',index_col = False)
df2 = pd.read_csv("DB2_output.csv", header = 0, encoding = 'utf-8',index_col = False)
df3 = pd.merge(df1,df2,how='inner',on=['id'])
f = self.output().open('w')
df3.to_csv(f,encoding = 'utf-8',index=False,header=True,quoting=2)
f.close()
if __name__ == '__main__':
luigi.run(main_task_cls=CreateReport,local_scheduler=False)

Start the luigi dashboard and execute the pipeline

$ nohup luigid&
$ python etl.py
Image for post
Image for post
Image for post
Image for post
Image for post
Image for post

DevOps engineer, loves Linux, Python, cats and Amiga computers

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store