- Published on
Data Engineering Zoomcamp | Week 2.3. ETL API to Postgres |
In the post, we'll be creating a data pipeline that ingests a compressed csv file, performs some light transformations, and then loads the data into postgres.
Data Ingestion
Right, first things first, we need to add a block to a pipeline. I've created a pipeline
and I'm now adding a block
into it. This block will be a data loader
. It will use python
as it's language and will authenticate and connect to postgres
. The connection to postgres was created in the previous blog.
Here's the code, itself:
import pandas as pd
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data(*args, **kwargs):
url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-07.csv.gz"
mapped_dtypes = {
'VendorID':float,
'passenger_count':float,
'trip_distance':float,
'RatecodeID':float,
'store_and_fwd_flag':str,
'PULocationID':float,
'DOLocationID':float,
'payment_type':float,
'fare_amount':float,
'extra':float,
'mta_tax':float,
'tip_amount':float,
'tolls_amount':float,
'improvement_surcharge':float,
'total_amount':float,
'congestion_surcharge':float,
}
dates_dtypes = [
'tpep_pickup_datetime',
'tpep_dropoff_datetime'
]
df = pd.read_csv(url, compression="gzip", dtype=mapped_dtypes, parse_dates=dates_dtypes)
return df
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
We use pandas as our dataframe of choice. You can spot the decorator @data_loader
which indicates to Mage that this block will recieve and pass dataframes from this block onto blocks downstream.
We've done some column datatypes remapping. These can be executed during ingestion as there's a parameter to map the schema dtype
in the pandas.read_csv
function. Also, rather than using pd.todatetime
after the data is read in, we can leverage the parse_dates
parameter. Of course, if the column contained dates in multiple formats, we may indeed need to perform this transformation after ingestion instead of using the input parameter. It's also really nice that pandas.read_csv
can be supplied a url - in this case .csv - to ingest AND you can supply the compression - in this case - a gzip!
We then use a Unit Test
with the assert statement in the @test
decorator code block. This can ensure the data passed between this block and the subsequent downstream blocks is expected and well defined (no hidden surprises! 🤪).
Light Transformation
Here we'll add another block to our pipeline. We'll be creating a transformer
block and this will link to the previous data_loader
block.
Above, I've selected a generic template using Python. In the data, there's a column called 'passenger_count'. I'm aiming to remove every row where there are 0 passengers. How can we have passenger journies without passengers?! 🧐. Therefore, the light transformation will be filtering our dataframe from our data_ingestion
block and using a unit test to verify that the output of the transformation block only contains passenger ride information that actually included passengers!
I'll supply a picture and also the code below it (for copy and pasta purposes, of course 😌). Note, in the picture, that the two blocks have a connection formed between them! (icons on the right hand side).
Picture from Mage
Code from Mage
&& here's the code for the copy/pasta enthusiasts:
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@transformer
def transform(data, *args, **kwargs):
data = data[data['passenger_count']!=0]
return data
@test
def test_output(output, *args) -> None:
assert output['passenger_count'].isin([0]).sum()==0, 'The output contains taxi ride without passengers!'
Above, we can see the transformer
block, and we can see the code it's comprised of. Unlike the previous code block, we have a decorator in here called: @transformer
. This asks for a data parameter within the function. If you haven't guessed already, this is the dataframe output from the data_ingestion
block that it's connected to:
To create the connection between the blocks we highlight over the block, wait for a circle to appear around the edge, and then drag from that circle to begin the connection path!
Lastly, talking about the code in the block itself. We filter our dataframe to remove passenger_counts with 0. We then use the assertion test to verify the passenger_count column doesn't contain 0s.
Load
Our last step is to load this data into our Postgres
instance. From the previous blogs, we've already established a connection to Postgres. Now, it should be a simple task of configuration! For the last block, we'll use the data exporter
to get the job done.
- Step 1: choosing the block config:
- Step 2: checking out the template provided:
In the picture above, we can see the template has provided us with almost EVERYTHING we need to load into Postgres. If you recall, when we setup the connection in the io_config.yaml
file to Postgres, we used a separate profile called DEV and supplied the following arguments
dev:
POSTGRES_CONNECT_TIMEOUT: 10
POSTGRES_DBNAME: "{{ env_var('POSTGRES_DBNAME')}}"
POSTGRES_SCHEMA: "{{ env_var('POSTGRES_SCHEMA')}}"
POSTGRES_USER: "{{ env_var('POSTGRES_USER')}}"
POSTGRES_PASSWORD: "{{ env_var('POSTGRES_PASSWORD')}}"
POSTGRES_HOST: "{{ env_var('POSTGRES_HOST')}}"
POSTGRES_PORT: "{{ env_var('POSTGRES_PORT')}}"
This is why the data_exporter
is only requiring us to provide the profile, schema name, and also the table name.
As we can see in the output, at the bottom of the picture; DONE
. It was a success, hoorah! 🚀
Verifying
As a test, let's check our table in Postgres that we've just created. Let's connect to it! 🧐
- First up, create a
data_loader
block:
- Lastly, configure the template and run!
As we can see in the output: 780906 rows in there. Happy days.
I hope you've enjoyed this one. I love the simplicity of Mage and I think the template it provides for the code blocks to be absolutely ACE. My only qualm as of the date of writing this blog is the lack of intellisense when building. I did reach out to Mage on Slack and they've indicated this is coming in the future which is splendid news. For transparency, I used both the pandas' documentation and google collab to run inline tests on the code. Once I was content, I would copy this across to Mage!