project structure
we need a project structure that helps us have module jobs and reuse components with different jobs, our applications is ultimately a ETL so we will structure it as such:
apps
│ └── src
│ ├── main
│ │ ├── extract
│ │ │ ├── extract.py
│ │ │ └── __init__.py
│ │ ├── __init__.py
│ │ ├── load
│ │ │ ├── __init__.py
│ │ │ └── load.py
│ │ ├── main.py
│ │ └── transform
│ │ ├── __init__.py
│ │ └── transform.py
│ └── test
│ └── test.py
Extract
extract module has a class that will contain all the utilities we need to get data from different sources.
class Extract:
def __init__(self,spark:SparkSession) -> None:
self.spark=spark
reading a table from the database
this class method reads from a specific database a specific table:
def read_table_db(self,password:str, db_table:str,url:str,user:str)-> DataFrame:
tableDataFrame= self.spark.read.format("jdbc") \
.option("url",url) \
.option("dbtable",db_table) \
.option("user",user) \
.option("password",password) \
.option("driver", "org.postgresql.Driver") \
.load() \
return tableDataFrame
we can make this function read from different databases by making the driver a argument.
reading a csv file
this class method reads a csv file, if we provide a schema it returns a dataframe according to the json schema otherwise it infers the schema:
def read_csv(self,file_name:str,sep:str=",",header:str="true",schema_file:str="") -> DataFrame:
if schema_file=="":
return self.spark.read.format("csv") \
.option("header",header) \
.option("sep",sep) \
.load(file_name)
else:
return self.spark.read.format("csv") \
.option("header",header) \
.option("sep",sep) \
.schema(self.get_schema(schema_file))\
.load(file_name)
def get_schema(self, schema_path:str):
with open(schema_path) as f:
d = json.load(f)
schema = StructType.fromJson(d)
return schema
Load
load module has a class LOad that will have utilities for loading the results of a transformation to a database or any other relevant storage according to our business need. here we will persist the results in the postgresql database.
class Load:
def __init__(self,spark:SparkSession) -> None:
self.spark=spark
Loading a file to the database
this utility loads a dataframe to the database:
def load_csv_db(self,df:DataFrame,password:str, db_table:str,url:str,user:str, mode:str="default"):
df.write \
.format("jdbc") \
.option("url", url) \
.option("dbtable", db_table) \
.option("user", user) \
.option("password", password) \
.option("driver", "org.postgresql.Driver") \
.mode(mode)\
.save()
the mode is an argument that can take the following values: - "append" : Append data to the existing table, creating the table if it does not exist - "overwrite" : Overwrite existing data in the table - "error" or "errorifexists": Throw an exception if data already exists. - "ignore": Silently ignore this operation if data already exists.
Transform
transform file will contain the business logic that effectively transform the data and creates the intelligence needed for the end users. in this section we will explain the structure but we will dive into examples of the transformation in this article transform
Transform class
the class also relies on a SparkSession as an entry point to Spark. it contains business logic as a class methods.
class Transform:
def __init__(self,spark:SparkSession) -> None:
self.spark=spark
Conclusion
the above structure will enable us to approach the business logic in a modular way. we will explore the transform module more in the following article Transform module