Published on

Data Engineering Zoomcamp | Week 2.3. ETL API to Postgres |

Authors

In the post, we'll be creating a data pipeline that ingests a compressed csv file, performs some light transformations, and then loads the data into postgres.

Data Ingestion

Right, first things first, we need to add a block to a pipeline. I've created a pipeline and I'm now adding a block into it. This block will be a data loader. It will use python as it's language and will authenticate and connect to postgres. The connection to postgres was created in the previous blog.

Here's a GIF of how the block will look: test

Here's the code, itself:

import pandas as pd

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@data_loader
def load_data(*args, **kwargs):
  
    url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-07.csv.gz"

    mapped_dtypes = {
    'VendorID':float,
    'passenger_count':float,
    'trip_distance':float,
    'RatecodeID':float,
    'store_and_fwd_flag':str,
    'PULocationID':float,
    'DOLocationID':float,
    'payment_type':float,
    'fare_amount':float,
    'extra':float,
    'mta_tax':float,
    'tip_amount':float,
    'tolls_amount':float,
    'improvement_surcharge':float,
    'total_amount':float,
    'congestion_surcharge':float,
    }

    dates_dtypes = [
        'tpep_pickup_datetime',
        'tpep_dropoff_datetime'
    ]

    df = pd.read_csv(url, compression="gzip", dtype=mapped_dtypes, parse_dates=dates_dtypes)

    return df


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

We use pandas as our dataframe of choice. You can spot the decorator @data_loader which indicates to Mage that this block will recieve and pass dataframes from this block onto blocks downstream.

We've done some column datatypes remapping. These can be executed during ingestion as there's a parameter to map the schema dtype in the pandas.read_csv function. Also, rather than using pd.todatetime after the data is read in, we can leverage the parse_dates parameter. Of course, if the column contained dates in multiple formats, we may indeed need to perform this transformation after ingestion instead of using the input parameter. It's also really nice that pandas.read_csv can be supplied a url - in this case .csv - to ingest AND you can supply the compression - in this case - a gzip!

We then use a Unit Test with the assert statement in the @test decorator code block. This can ensure the data passed between this block and the subsequent downstream blocks is expected and well defined (no hidden surprises! 🤪).

Light Transformation

Here we'll add another block to our pipeline. We'll be creating a transformer block and this will link to the previous data_loader block.

trans-config

Above, I've selected a generic template using Python. In the data, there's a column called 'passenger_count'. I'm aiming to remove every row where there are 0 passengers. How can we have passenger journies without passengers?! 🧐. Therefore, the light transformation will be filtering our dataframe from our data_ingestion block and using a unit test to verify that the output of the transformation block only contains passenger ride information that actually included passengers!

I'll supply a picture and also the code below it (for copy and pasta purposes, of course 😌). Note, in the picture, that the two blocks have a connection formed between them! (icons on the right hand side).

Picture from Mage

trans-code

Code from Mage

&& here's the code for the copy/pasta enthusiasts:

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@transformer
def transform(data, *args, **kwargs):

    data = data[data['passenger_count']!=0]
    return data


@test
def test_output(output, *args) -> None:
    
    assert output['passenger_count'].isin([0]).sum()==0, 'The output contains taxi ride without passengers!'

Above, we can see the transformer block, and we can see the code it's comprised of. Unlike the previous code block, we have a decorator in here called: @transformer. This asks for a data parameter within the function. If you haven't guessed already, this is the dataframe output from the data_ingestion block that it's connected to:

trans-connect

To create the connection between the blocks we highlight over the block, wait for a circle to appear around the edge, and then drag from that circle to begin the connection path!

Lastly, talking about the code in the block itself. We filter our dataframe to remove passenger_counts with 0. We then use the assertion test to verify the passenger_count column doesn't contain 0s.

Load

Our last step is to load this data into our Postgres instance. From the previous blogs, we've already established a connection to Postgres. Now, it should be a simple task of configuration! For the last block, we'll use the data exporter to get the job done.

  • Step 1: choosing the block config: de-export-config-step1
  • Step 2: checking out the template provided: de-export-config-step2

In the picture above, we can see the template has provided us with almost EVERYTHING we need to load into Postgres. If you recall, when we setup the connection in the io_config.yaml file to Postgres, we used a separate profile called DEV and supplied the following arguments

dev:
  POSTGRES_CONNECT_TIMEOUT: 10
  POSTGRES_DBNAME: "{{ env_var('POSTGRES_DBNAME')}}"
  POSTGRES_SCHEMA: "{{ env_var('POSTGRES_SCHEMA')}}"
  POSTGRES_USER: "{{ env_var('POSTGRES_USER')}}"
  POSTGRES_PASSWORD: "{{ env_var('POSTGRES_PASSWORD')}}"
  POSTGRES_HOST: "{{ env_var('POSTGRES_HOST')}}"
  POSTGRES_PORT: "{{ env_var('POSTGRES_PORT')}}"

This is why the data_exporter is only requiring us to provide the profile, schema name, and also the table name.

So, we'll go ahead and provide these: de-export-config-step3

As we can see in the output, at the bottom of the picture; DONE. It was a success, hoorah! 🚀

Verifying

As a test, let's check our table in Postgres that we've just created. Let's connect to it! 🧐

  • First up, create a data_loader block:
de-import-block-step4
  • Lastly, configure the template and run! de-import-block-step5

As we can see in the output: 780906 rows in there. Happy days.

I hope you've enjoyed this one. I love the simplicity of Mage and I think the template it provides for the code blocks to be absolutely ACE. My only qualm as of the date of writing this blog is the lack of intellisense when building. I did reach out to Mage on Slack and they've indicated this is coming in the future which is splendid news. For transparency, I used both the pandas' documentation and google collab to run inline tests on the code. Once I was content, I would copy this across to Mage!