to understand the context of the transformations, you should check how to deploy a dev environnement etc .... however you can look at this post as a dive into some Pyspark examples to create data transformations.
Datasets
flights
the raw csv files that contain flight data ar as follow.
Name | Description |
---|---|
Year | 2008 |
Month | 11 |
DayofMonth | 1-31 |
DayOfWeek | 1 (Monday) - 7 (Sunday) |
DepTime | actual departure time (local, hhmm) |
CRSDepTime | scheduled departure time (local, hhmm) |
ArrTime | actual arrival time (local, hhmm) |
CRSArrTime | scheduled arrival time (local, hhmm) |
UniqueCarrier | unique carrier code |
FlightNum | flight number |
TailNum | plane tail number |
ActualElapsedTime | in minutes |
CRSElapsedTime | CRS Elapsed Time of Flight (estimated elapse time), in minutes |
AirTime | Flight Time, in Minutes |
ArrDelay | Difference in minutes between scheduled and actual arrival time.Early arrivals show negative numbers, in minutes |
DepDelay | Difference in minutes between scheduled and actual departure time.Early departures show negative numbers, in minutes |
Origin | origin IATA airport code |
Dest | destination IATA airport code |
Distance | Distance between airports (miles) |
TaxiIn | Wheels down and arrival at the destination airport gate, in minutes |
TaxiOut | The time elapsed between departure from the origin airport gate and wheels off, in minutes |
Cancelled | was the flight cancelled? |
CancellationCode | reason for cancellation (A = carrier, B = weather, C = NAS, D = security) |
Diverted | 1 = yes, 0 = no |
CarrierDelay | in minutes |
WeatherDelay | in minutes |
NASDelay | in minutes |
SecurityDelay | in minutes |
LateAircraftDelay | in minutes |
In addition we loaded two datasets that have information about airports and carriers in the postgres database.
Airports
Name | Description |
---|---|
iata | AITA airoport code |
airport | name of the airport |
city | the city where the airport is located |
state | state where the airport is located |
country | country where the airport is located |
lat | latitude |
long | longitude |
Carriers
Name | Description |
---|---|
Code | Airline codes |
Description | Airline description |
Transform
the transform model contain a class that has all the business logic and utilities to process the data :
class Transform:
def __init__(self, spark: SparkSession) -> None:
self.spark = spark
TODO: add appropriate functions
Number of flights that a carrier serves per month for a particular line
def flights_per_month_carrier_line(self, rawData: DataFrame) -> DataFrame:
columns_to_get = ['FlightNum', 'Origin', 'Dest', "Year", "Month",
'Distance', 'Cancelled', 'Diverted', 'UniqueCarrier', 'CancellationCode']
data_counts = rawData.groupBy(columns_to_get).count()
return data_counts
Number of flights cancelled for a particular carrier per month
def flights_cancelled_per_month_carrier(self, rawData: DataFrame):
nTripsPerLineCancelledPerMonth = rawData.filter(rawData.Cancelled == 1)\
.groupBy("Origin", "Dest", "Year", "Month", "UniqueCarrier", "Cancelled").count()
return nTripsPerLineCancelledPerMonth
Average Delays for a carrier per day
# don't take into account early arrivals
def average_delays_per_line_carrier_day(self, rawData: DataFrame):
return rawData.filter(rawData.Cancelled == 0).filter(rawData.ArrDelay >= 0)\
.groupBy("Origin", "Dest", "Year", "Month", "DayofMonth", "UniqueCarrier")\
.avg("ArrDelay")
Cancelled flights per month
def cancelled_flights_per_month(self, rawData: DataFrame):
return rawData.filter(rawData.Cancelled == 1)\
.groupBy("Origin", "Dest", "Year", "Month", "UniqueCarrier",).count()\
.withColumnRenamed("count", "numCancelled")
percentage of cancelled and delayed flights per month for each carrier
def monthly_cancelled_delayed_flights_percentage(self, rawData: DataFrame):
percy = udf(lambda x, y: x*100/y)
delayed = when(rawData.Cancelled == 1, 0).when(
rawData.Diverted == 1, 0).when(rawData.ArrDelay >= 5, 1).otherwise(0)
return rawData.groupBy("Origin", "Dest", "Year", "Month", "UniqueCarrier")\
.agg(sum(rawData.Cancelled).alias('nCancelled'), count("*").alias("nFlights"), sum(delayed).alias("nDelayed"), sum(col("Diverted")).alias("nDiverted"),)\
.withColumn("%Cancelled", percy(col('nCancelled'), col("nFlights")))\
.withColumn("%Delayed", percy(col('nDelayed'), col("nFlights")))
enrich flights data with carrier name.
def add_carrier_name(self, rawData: DataFrame, carrierDf: DataFrame):
joinExpression = carrierDf.Code == rawData.UniqueCarrier
return rawData.join(broadcast(carrierDf), joinExpression, "left_outer")