Home | Benchmarks | Categories | Atom Feed

Posted on Fri 11 March 2016 under Databases

A Billion Taxi Rides on Amazon EMR running Presto

This blog post will cover how I took a billion+ records containing six years of taxi ride metadata in New York City and analysed them using Presto on Amazon EMR. I stored the data on S3 instead of HDFS so that I could launch EMR clusters only when I need them while only paying a few dollars a month to permanently store the data on S3.

The links in this blog post pointing to the AWS console use the eu-west-1 region. This may or may not be the most suitable location for you. You can switch the region by clicking the drop down that reads "Ireland" in the top right of the console.

AWS CLI Up and Running

All the following commands were run on a fresh install of Ubuntu 14.04.3.

To start, I'll install the AWS CLI tool and a few dependencies it needs to run.

$ sudo apt update
$ sudo apt install \
    python-pip \
    python-virtualenv
$ virtualenv amazon
$ source amazon/bin/activate
$ pip install awscli

I'll then enter my AWS credentials.

$ read AWS_ACCESS_KEY_ID
$ read AWS_SECRET_ACCESS_KEY
$ export AWS_ACCESS_KEY_ID
$ export AWS_SECRET_ACCESS_KEY

104 GB of Data on S3

I've uploaded the taxi trip metadata to a folder called raw in an S3 bucket. This bucket lives in eu-west-1, it's important when you launch your EMR cluster that the S3 bucket where your data is stored and the EMR cluster live in the same region.

To generate this data please see the steps in my Billion Taxi Rides in Redshift blog post.

Replace <s3_bucket> with your bucket name in the following command.

$ aws s3 ls <s3_bucket>/raw | grep csv.gz$
... 2024092233 trips_xaa.csv.gz
... 2023686578 trips_xab.csv.gz
... 2022717460 trips_xac.csv.gz
... 2023507930 trips_xad.csv.gz
... 2024224441 trips_xae.csv.gz
... 2030784022 trips_xaf.csv.gz
... 2036427328 trips_xag.csv.gz
... 2038915704 trips_xah.csv.gz
... 2039556774 trips_xai.csv.gz
... 2039272131 trips_xaj.csv.gz
... 2019988039 trips_xak.csv.gz
... 2001054379 trips_xal.csv.gz
... 1999737958 trips_xam.csv.gz
... 1997898854 trips_xan.csv.gz
... 1997471865 trips_xao.csv.gz
... 2000533767 trips_xap.csv.gz
... 1998974328 trips_xaq.csv.gz
... 2009717364 trips_xar.csv.gz
... 2016934402 trips_xas.csv.gz
... 2022353263 trips_xat.csv.gz
... 2013756232 trips_xau.csv.gz
... 2013475097 trips_xav.csv.gz
... 2020404819 trips_xaw.csv.gz
... 2015585302 trips_xax.csv.gz
... 2008026925 trips_xay.csv.gz
... 2007821692 trips_xaz.csv.gz
... 2005814365 trips_xba.csv.gz
... 2013779043 trips_xbb.csv.gz
... 2018006900 trips_xbc.csv.gz
... 2019260716 trips_xbd.csv.gz
... 1893270062 trips_xbe.csv.gz
... 1854614596 trips_xbf.csv.gz
... 1854948290 trips_xbg.csv.gz
... 1855481821 trips_xbh.csv.gz
... 2008073682 trips_xbi.csv.gz
... 2023698241 trips_xbj.csv.gz
... 2028094175 trips_xbk.csv.gz
... 2030037816 trips_xbl.csv.gz
... 2048044463 trips_xbm.csv.gz
... 2031794840 trips_xbn.csv.gz
... 2034562660 trips_xbo.csv.gz
... 2034332904 trips_xbp.csv.gz
... 2036182649 trips_xbq.csv.gz
... 2035637794 trips_xbr.csv.gz
... 2026865353 trips_xbs.csv.gz
... 2028888024 trips_xbt.csv.gz
... 2027725347 trips_xbu.csv.gz
... 2027090129 trips_xbv.csv.gz
... 2024075447 trips_xbw.csv.gz
... 2025027115 trips_xbx.csv.gz
... 2020889873 trips_xby.csv.gz
... 2025885378 trips_xbz.csv.gz
... 2023688771 trips_xca.csv.gz
... 2024722253 trips_xcb.csv.gz
... 2030741659 trips_xcc.csv.gz
... 1383149265 trips_xcd.csv.gz

As you can see there is ~104 GB of CSV data compressed in gzip format sitting in that folder.

5-node EMR Cluster Up and Running

If you haven't launched an EMR cluster before then I suggest doing so via the AWS Console. Make sure you turn on advanced options so you can pick spot instances instead of on-demand instances for your core and task nodes. When you use the EMR wizard it'll create the roles and security groups needed for your cluster nodes to communicate with one another.

Before you run the command below generate a key pair called "emr". Download and store the emr.pem file in the ~/.ssh/ folder on your system. This file is the SSH private key that lets you access the Hadoop cluster nodes via SSH.

Below you will need to change a few items before executing the create-cluster command:

  • If the key name you generated was called anything other than emr then change the KeyName attribute.
  • The InstanceProfile is set to the name the wizard automatically generated. If you haven't used the wizard before then make sure this value matches your role.
  • The --region and AvailabilityZone parameters are set to eu-west-1a, change this if this region isn't the most appropriate for you. This region should match the region where you've stored the denormalised CSV data on S3.
  • The log-uri parameter's bucket name needs to be changed to another bucket name that hasn't already been taken.
  • The BidPrice amounts are in US Dollars and were appropriate at the time I was bidding for them. Please see the current spot prices for m3.xlarge instances in your region and adjust these accordingly. There is a pricing history button on the EC2 Spot Requests page.
  • The master node needs to stay up if the cluster is to survive so it's an on-demand instance instead of a spot instance. It's also an m3.xlarge which can be considered overkill for the tasks it performs. Consider using a less expensive and less powerful EC2 instance type.
$ aws emr create-cluster \
      --applications \
        Name=Hadoop \
        Name=Hive \
        Name=Presto-Sandbox \
      --ec2-attributes '{
          "KeyName": "emr",
          "InstanceProfile": "EMR_EC2_DefaultRole",
          "AvailabilityZone": "eu-west-1a",
          "EmrManagedSlaveSecurityGroup": "sg-89cd3eff",
          "EmrManagedMasterSecurityGroup": "sg-d4cc3fa2"
      }' \
      --service-role EMR_DefaultRole \
      --release-label emr-4.3.0 \
      --log-uri 's3n://aws-logs-591231097547-eu-west-1/elasticmapreduce/' \
      --name 'A Billion Taxi Trips' \
      --instance-groups '[{
          "InstanceCount": 2,
          "BidPrice": "0.048",
          "InstanceGroupType": "CORE",
          "InstanceType": "m3.xlarge",
          "Name": "Core instance group - 2"
      }, {
          "InstanceCount": 2,
          "BidPrice": "0.048",
          "InstanceGroupType": "TASK",
          "InstanceType": "m3.xlarge",
          "Name": "Task instance group - 3"
      }, {
          "InstanceCount": 1,
          "InstanceGroupType": "MASTER",
          "InstanceType": "m3.xlarge",
          "Name": "Master instance group - 1"
      }]' \
      --region eu-west-1

Once that has executed make sure you add access for your IP address on port 22 to the ElasticMapReduce-master security group so that you can SSH into the master node.

The cluster can take 20 - 30 minutes to finish provisioning and bootstrapping all of the nodes so keep an eye on its status in the EMR console.

AWS Costs

The above cluster has 4 spot instances that cost $0.048 / hour at most plus an on-demand m3.xlarge instance that costs $0.293 / hour bringing the hourly cost to $0.485 / hour or less depending on how much the final prices of the spot instances are. The price charged for EC2 instances is always rounded up to the next full hour. If you ran the cluster for an hour and fifteen minutes you'd be charged for two hours.

The data is 104 GB in gzip format and there will need to be another ~49 GB of S3 space used for the data in ORC format. The cost of storing around 150 GB of data on S3 with the request patterns Presto will have should be around $4.37 / month.

Accessing the Master Node

Change the EC2 hostname below to the hostname of your master node.

$ ssh -i ~/.ssh/emr.pem \
    hadoop@ec2-54-170-8-206.eu-west-1.compute.amazonaws.com

Once you've SSH'ed in you should see a banner resembling the following:

       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2015.09-release-notes/
29 package(s) needed for security, out of 44 available
Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

[hadoop@ip-10-91-26-11 ~]$

The first command I'll run will fix the permissions on the hive logs folder.

$ sudo su -c 'mkdir -p /var/log/hive/user/hadoop &&
              chown -R hadoop /var/log/hive/user/hadoop'

Creating Tables in the Hive Metastore

I'll issue commands to hive to create tables which will represent our data being stored on S3. Make sure the bucket name used in the LOCATION directive exists but the folders orc and csv don't.

I've run the following command inside of a screen. If you're disconnected from the master server you can ssh back in and re-connect to your screen using the screen -r command. While your disconnected your commands will continue to as normal.

$ screen
$ hive
CREATE EXTERNAL TABLE trips_csv (
    trip_id                 INT,
    vendor_id               VARCHAR(3),
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      VARCHAR(1),
    rate_code_id            SMALLINT,
    pickup_longitude        DECIMAL(18,14),
    pickup_latitude         DECIMAL(18,14),
    dropoff_longitude       DECIMAL(18,14),
    dropoff_latitude        DECIMAL(18,14),
    passenger_count         SMALLINT,
    trip_distance           DECIMAL(6,3),
    fare_amount             DECIMAL(6,2),
    extra                   DECIMAL(6,2),
    mta_tax                 DECIMAL(6,2),
    tip_amount              DECIMAL(6,2),
    tolls_amount            DECIMAL(6,2),
    ehail_fee               DECIMAL(6,2),
    improvement_surcharge   DECIMAL(6,2),
    total_amount            DECIMAL(6,2),
    payment_type            VARCHAR(3),
    trip_type               SMALLINT,
    pickup                  VARCHAR(50),
    dropoff                 VARCHAR(50),

    cab_type                VARCHAR(6),

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          VARCHAR(10),
    pickup_borocode         SMALLINT,
    pickup_boroname         VARCHAR(13),
    pickup_ct2010           VARCHAR(6),
    pickup_boroct2010       VARCHAR(7),
    pickup_cdeligibil       VARCHAR(1),
    pickup_ntacode          VARCHAR(4),
    pickup_ntaname          VARCHAR(56),
    pickup_puma             VARCHAR(4),

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         VARCHAR(10),
    dropoff_borocode        SMALLINT,
    dropoff_boroname        VARCHAR(13),
    dropoff_ct2010          VARCHAR(6),
    dropoff_boroct2010      VARCHAR(7),
    dropoff_cdeligibil      VARCHAR(1),
    dropoff_ntacode         VARCHAR(4),
    dropoff_ntaname         VARCHAR(56),
    dropoff_puma            VARCHAR(4)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  LOCATION 's3://<s3_bucket>/csv/';

CREATE EXTERNAL TABLE trips_orc (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION 's3://<s3_bucket>/orc/';

Loading 1.1 Billion Trips into ORC Format

The data is in CSV format but in order to analyse the data quickly it'll need to be converted into ORC format. When in ORC format the data is stored in a column-oriented form and each column will be compressed. The 104 GB of compressed CSV data (~500 GB uncompressed) will only be ~49 GB in size when it's in ORC format.

echo "LOAD DATA INPATH 's3://<s3_bucket>/raw'
      INTO TABLE trips_csv;

      INSERT INTO TABLE trips_orc
      SELECT * FROM trips_csv;" | hive

The above command took 3 hours and 45 minutes to complete.

This is a one-off exercise. After the above finishes you shouldn't need to convert the CSV data into ORC any more and the ORC files will live on in S3 so you can use them in another EMR cluster later on.

Below is a snippet of the listing of the 192 ORC files in the S3 bucket I used.

$ aws s3 ls <s3_bucket>/orc/
2016-03-14 13:54:41  398631347 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000000
2016-03-14 13:54:40  393489828 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000001
2016-03-14 13:54:40  339863956 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000002
2016-03-14 13:54:29  319146976 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000003
....
2016-03-14 14:00:18  261801034 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000188
2016-03-14 14:00:26  266048453 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000189
2016-03-14 14:00:23  265076102 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000190
2016-03-14 14:00:12  115110402 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000191

Benchmarking Queries in Presto

The following queries ran on Presto 0.130, the version of Presto that EMR ships with.

$ presto-cli --catalog hive --schema default

The following completed in 1 minute and 15 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc
GROUP BY cab_type;
Query 20160314_140142_00002_gj4np, FINISHED, 4 nodes
Splits: 749 total, 749 done (100.00%)
1:15 [1.11B rows, 48.8GB] [14.8M rows/s, 667MB/s]

The following completed in 1 minute and 4 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;
Query 20160314_140314_00003_gj4np, FINISHED, 4 nodes
Splits: 749 total, 749 done (100.00%)
1:04 [1.11B rows, 48.8GB] [17.3M rows/s, 777MB/s]

The following completed in 1 minute and 23 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime);
Query 20160314_140434_00006_gj4np, FINISHED, 4 nodes
Splits: 749 total, 749 done (100.00%)
1:23 [1.11B rows, 48.8GB] [13.5M rows/s, 604MB/s]

The following completed in 1 minute and 45 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;
Query 20160314_140606_00007_gj4np, FINISHED, 4 nodes
Splits: 749 total, 749 done (100.00%)
1:45 [1.11B rows, 48.8GB] [10.6M rows/s, 474MB/s]
Thank you for taking the time to read this post. I offer both consulting and hands-on development services to clients in North America and Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2024 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.