Flight dataset project


project structure

we need a project structure that helps us have module jobs and reuse components with different jobs, our applications is ultimately a ETL so we will structure it as such:

apps
│   └── src
│       ├── main
│       │   ├── extract
│       │   │   ├── extract.py
│       │   │   └── __init__.py
│       │   ├── __init__.py
│       │   ├── load
│       │   │   ├── __init__.py
│       │   │   └── load.py
│       │   ├── main.py
│       │   └── transform
│       │       ├── __init__.py
│       │       └── transform.py
│       └── test
│           └── test.py

Extract

extract module has a class that will contain all the utilities we need to get data from different sources.

class Extract:
    def __init__(self,spark:SparkSession) -> None:
        self.spark=spark

reading a table from the database

this class method reads from a specific database a specific table:

    def read_table_db(self,password:str, db_table:str,url:str,user:str)-> DataFrame:
        tableDataFrame= self.spark.read.format("jdbc") \
            .option("url",url) \
            .option("dbtable",db_table) \
            .option("user",user) \
            .option("password",password) \
            .option("driver", "org.postgresql.Driver") \
            .load() \

        return tableDataFrame

we can make this function read from different databases by making the driver a argument.

reading a csv file

this class method reads a csv file, if we provide a schema it returns a dataframe according to the json schema otherwise it infers the schema:

    def read_csv(self,file_name:str,sep:str=",",header:str="true",schema_file:str="") -> DataFrame:
        if schema_file=="":
            return self.spark.read.format("csv") \
                        .option("header",header) \
                        .option("sep",sep) \
                        .load(file_name)
        else:
            return self.spark.read.format("csv") \
                        .option("header",header) \
                        .option("sep",sep) \
                        .schema(self.get_schema(schema_file))\
                        .load(file_name)


    def get_schema(self, schema_path:str):
        with open(schema_path) as f:
            d = json.load(f)
            schema = StructType.fromJson(d)
        return schema

Load

load module has a class LOad that will have utilities for loading the results of a transformation to a database or any other relevant storage according to our business need. here we will persist the results in the postgresql database.

class Load:
    def __init__(self,spark:SparkSession) -> None:
        self.spark=spark

Loading a file to the database

this utility loads a dataframe to the database:

def load_csv_db(self,df:DataFrame,password:str, db_table:str,url:str,user:str, mode:str="default"):
        df.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", db_table) \
            .option("user", user) \
            .option("password", password) \
            .option("driver", "org.postgresql.Driver") \
            .mode(mode)\
            .save()

the mode is an argument that can take the following values: - "append" : Append data to the existing table, creating the table if it does not exist - "overwrite" : Overwrite existing data in the table - "error" or "errorifexists": Throw an exception if data already exists. - "ignore": Silently ignore this operation if data already exists.

Transform

transform file will contain the business logic that effectively transform the data and creates the intelligence needed for the end users. in this section we will explain the structure but we will dive into examples of the transformation in this article transform

Transform class

the class also relies on a SparkSession as an entry point to Spark. it contains business logic as a class methods.

class Transform:
    def __init__(self,spark:SparkSession) -> None:
        self.spark=spark

Conclusion

the above structure will enable us to approach the business logic in a modular way. we will explore the transform module more in the following article Transform module