Pyspark to process flight data dataset


to understand the context of the transformations, you should check how to deploy a dev environnement etc .... however you can look at this post as a dive into some Pyspark examples to create data transformations.

Datasets

flights

the raw csv files that contain flight data ar as follow.

Name Description
Year 2008
Month 11
DayofMonth 1-31
DayOfWeek 1 (Monday) - 7 (Sunday)
DepTime actual departure time (local, hhmm)
CRSDepTime scheduled departure time (local, hhmm)
ArrTime actual arrival time (local, hhmm)
CRSArrTime scheduled arrival time (local, hhmm)
UniqueCarrier unique carrier code
FlightNum flight number
TailNum plane tail number
ActualElapsedTime in minutes
CRSElapsedTime CRS Elapsed Time of Flight (estimated elapse time), in minutes
AirTime Flight Time, in Minutes
ArrDelay Difference in minutes between scheduled and actual arrival time.Early arrivals show negative numbers, in minutes
DepDelay Difference in minutes between scheduled and actual departure time.Early departures show negative numbers, in minutes
Origin origin IATA airport code
Dest destination IATA airport code
Distance Distance between airports (miles)
TaxiIn Wheels down and arrival at the destination airport gate, in minutes
TaxiOut The time elapsed between departure from the origin airport gate and wheels off, in minutes
Cancelled was the flight cancelled?
CancellationCode reason for cancellation (A = carrier, B = weather, C = NAS, D = security)
Diverted 1 = yes, 0 = no
CarrierDelay in minutes
WeatherDelay in minutes
NASDelay in minutes
SecurityDelay in minutes
LateAircraftDelay in minutes

In addition we loaded two datasets that have information about airports and carriers in the postgres database.

Airports

Name Description
iata AITA airoport code
airport name of the airport
city the city where the airport is located
state state where the airport is located
country country where the airport is located
lat latitude
long longitude

Carriers

Name Description
Code Airline codes
Description Airline description

Transform

the transform model contain a class that has all the business logic and utilities to process the data :

class Transform:
    def __init__(self, spark: SparkSession) -> None:
        self.spark = spark

TODO: add appropriate functions

Number of flights that a carrier serves per month for a particular line

def flights_per_month_carrier_line(self, rawData: DataFrame) -> DataFrame:
    columns_to_get = ['FlightNum', 'Origin', 'Dest', "Year", "Month",
                          'Distance', 'Cancelled', 'Diverted', 'UniqueCarrier', 'CancellationCode']
    data_counts = rawData.groupBy(columns_to_get).count()
    return data_counts

Number of flights cancelled for a particular carrier per month

def flights_cancelled_per_month_carrier(self, rawData: DataFrame):
    nTripsPerLineCancelledPerMonth = rawData.filter(rawData.Cancelled == 1)\
        .groupBy("Origin", "Dest", "Year", "Month", "UniqueCarrier", "Cancelled").count()
    return nTripsPerLineCancelledPerMonth

Average Delays for a carrier per day

# don't take into account early arrivals
def average_delays_per_line_carrier_day(self, rawData: DataFrame):
    return rawData.filter(rawData.Cancelled == 0).filter(rawData.ArrDelay >= 0)\
        .groupBy("Origin", "Dest", "Year", "Month", "DayofMonth", "UniqueCarrier")\
        .avg("ArrDelay")

Cancelled flights per month

def cancelled_flights_per_month(self, rawData: DataFrame):
    return rawData.filter(rawData.Cancelled == 1)\
        .groupBy("Origin", "Dest", "Year", "Month", "UniqueCarrier",).count()\
        .withColumnRenamed("count", "numCancelled")

percentage of cancelled and delayed flights per month for each carrier

def monthly_cancelled_delayed_flights_percentage(self, rawData: DataFrame):
    percy = udf(lambda x, y: x*100/y)
    delayed = when(rawData.Cancelled == 1, 0).when(
        rawData.Diverted == 1, 0).when(rawData.ArrDelay >= 5, 1).otherwise(0)

    return rawData.groupBy("Origin", "Dest", "Year", "Month", "UniqueCarrier")\
        .agg(sum(rawData.Cancelled).alias('nCancelled'), count("*").alias("nFlights"), sum(delayed).alias("nDelayed"), sum(col("Diverted")).alias("nDiverted"),)\
        .withColumn("%Cancelled", percy(col('nCancelled'), col("nFlights")))\
        .withColumn("%Delayed", percy(col('nDelayed'), col("nFlights")))

enrich flights data with carrier name.

def add_carrier_name(self, rawData: DataFrame, carrierDf: DataFrame):

        joinExpression = carrierDf.Code == rawData.UniqueCarrier
        return rawData.join(broadcast(carrierDf), joinExpression, "left_outer")