Home | Benchmarks | Categories | Atom Feed

Posted on Fri 06 May 2016 under Databases

All 1.1 Billion Taxi Rides in Elasticsearch

A month ago Zachary Tong of Elastic reached out to me after seeing my Billion Taxi Rides in Elasticsearch blog post. He had noticed how I only included 5 of the 51 fields from the 1.1 billion taxi trip dataset due to drive space concerns. Zachary was kind enough to put together a list of recommendations that would allow me to import every field from the dataset and still fit in the 850 GB SSD drive I'd dedicated to this task.

In this blog post I'll walk through the import process again using the improvements suggested.

I'll use the same hardware as in my previous blog post, a Core i5 3.4 GHz CPU, 16 GB of RAM, an 850 GB SSD drive and a second 1 TB mechanical drive.

Installing Dependencies

The following was run on a fresh install of Ubuntu 14.04.3 LTS.

I'll add Elastic's repository details and keys to my system.

$ echo "deb http://packages.elastic.co/elasticsearch/2.x/debian stable main" | \
    sudo tee /etc/apt/sources.list.d/elasticsearch.list
$ echo "deb http://packages.elastic.co/logstash/2.2/debian stable main" | \
    sudo tee /etc/apt/sources.list.d/logstash.list
$ gpg --keyserver pgp.mit.edu --recv-keys D88E42B4
$ gpg --armor --export D88E42B4 | sudo apt-key add -

I'll then add in the repository details needed to install Oracle's Java distribution.

$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt update

The following will install Java 7, Elasticsearch 2.1.1 (which uses Lucene 5.3.1) and Logstash 2.2.

$ sudo apt install \
    elasticsearch=2.1.1 \
    logstash \
    oracle-java7-installer

Elasticsearch Up & Running

The application configuration for Elasticsearch will stay the same as before. The reasoning behind each of these configuration options is described here.

$ sudo vi /etc/elasticsearch/elasticsearch.yml
bootstrap.mlockall: true
cluster.routing.allocation.disk.threshold_enabled: true
cluster.routing.allocation.disk.watermark.low: .98
cluster.routing.allocation.disk.watermark.high: .99
index.codec: best_compression
index.number_of_replicas: 0
index.refresh_interval: -1
indices.fielddata.cache.size: 25%

One of the first recommendations I'll put into practice is lowering the heap size to 4 GB and removing the use of the G1 garbage collector. According to Zachary:

"G1GC is still buggy and eats more CPU, we don't really recommend it."

He went on to say:

"That's not to say G1 won't be a perfect match some day...but we don't think it's ready yet"
$ sudo vi /etc/init.d/elasticsearch
ES_HEAP_SIZE=4g

I'll install the SQL interface plugin for Elasticsearch as before.

$ sudo /usr/share/elasticsearch/bin/plugin install \
    https://github.com/NLPchina/elasticsearch-sql/releases/download/2.1.1/elasticsearch-sql-2.1.1.zip

The Dataset

I'll be using the compressed CSV files I generated in my Billion Taxi Rides in Redshift blog post. The system I'm using has two drives, the first, the SSD, contains the OS, applications (including Elasticsearch) and dependencies while the second, mechanical drive will hold ~500 GB of uncompressed CSV data.

The gzip files live in the /one_tb_drive/taxi-trips/ folder and will be decompressed there as well. Normally I would not decompress this data and instead feed the compressed data into Elasticsearch but Logstash doesn't support working with gzip data.

$ cd /one_tb_drive/taxi-trips/
$ gunzip *.gz

Optimising Disk Usage

I'll create a template of mappings for Elasticsearch that will be applied to any index that has a name that starts with "trip". These mappings remove a lot of full-text functionality in order to save on disk space. Among the changes to the out-of-the-box defaults include the use of a single primary shard, no replicas, no refresh interval, no _all or _source fields, no inverted index and all doubles will be stored as floats. The translog threshold was pushed up to 1 GB to lower the overhead of it flushing to segments during importing.

$ curl -XPUT localhost:9200/_template/pure_analytics -d '
  {
      "template": "trip*",
      "settings": {
          "number_of_shards": 1,
          "number_of_replicas": 0,
          "index.translog.flush_threshold_size": "1g",
          "index.refresh_interval": -1
      },
      "mappings": {
          "_default_": {
              "_all": {
                  "enabled": false
              },
              "_source": {
                  "enabled": false
              },
              "dynamic_templates": [{
                  "doubles": {
                      "match_mapping_type": "double",
                      "mapping": {
                          "type": "float"
                      }
                  }
              }, {
                  "strings": {
                      "match_mapping_type": "string",
                      "mapping": {
                          "type": "string",
                          "index": "no",
                          "doc_values": true
                      }
                  }
              }],
              "properties": {
                  "@timestamp": {
                      "type": "date"
                  },
                  "@version": {
                      "type": "string",
                      "index": "not_analyzed"
                  },
                  "pickup_datetime": {
                      "type": "date"
                  },
                  "dropoff_datetime": {
                      "type": "date"
                  }
              }
          }
      }
  }'

Importing a Billion Trips

I'll be using Logstash to import the data into Elasticsearch. Below is the configuration file I used for this job.

$ vi ~/trips.conf
input {
    file {
        path => "/one_tb_drive/taxi-trips/*.csv"
        type => "trip"
        start_position => "beginning"
    }
}

filter {
    csv {
        columns => ["trip_id",
                    "vendor_id",
                    "pickup_datetime",
                    "dropoff_datetime",
                    "store_and_fwd_flag",
                    "rate_code_id",
                    "pickup_longitude",
                    "pickup_latitude",
                    "dropoff_longitude",
                    "dropoff_latitude",
                    "passenger_count",
                    "trip_distance",
                    "fare_amount",
                    "extra",
                    "mta_tax",
                    "tip_amount",
                    "tolls_amount",
                    "ehail_fee",
                    "improvement_surcharge",
                    "total_amount",
                    "payment_type",
                    "trip_type",
                    "pickup",
                    "dropoff",
                    "cab_type",
                    "precipitation",
                    "snow_depth",
                    "snowfall",
                    "max_temperature",
                    "min_temperature",
                    "average_wind_speed",
                    "pickup_nyct2010_gid",
                    "pickup_ctlabel",
                    "pickup_borocode",
                    "pickup_boroname",
                    "pickup_ct2010",
                    "pickup_boroct2010",
                    "pickup_cdeligibil",
                    "pickup_ntacode",
                    "pickup_ntaname",
                    "pickup_puma",
                    "dropoff_nyct2010_gid",
                    "dropoff_ctlabel",
                    "dropoff_borocode",
                    "dropoff_boroname",
                    "dropoff_ct2010",
                    "dropoff_boroct2010",
                    "dropoff_cdeligibil",
                    "dropoff_ntacode",
                    "dropoff_ntaname",
                    "dropoff_puma"]
        separator => ","
    }

    date {
        match => ["pickup_datetime", "YYYY-MM-dd HH:mm:ss"]
        timezone => "America/New_York"
        target => "pickup_datetime"
    }

    date {
        match => ["dropoff_datetime", "YYYY-MM-dd HH:mm:ss"]
        timezone => "America/New_York"
        target => "dropoff_datetime"
    }

    mutate {
        convert => {
            "trip_id"               => "integer"
            "vendor_id"             => "string"
            "store_and_fwd_flag"    => "string"
            "rate_code_id"          => "integer"
            "pickup_longitude"      => "float"
            "pickup_latitude"       => "float"
            "dropoff_longitude"     => "float"
            "dropoff_latitude"      => "float"
            "passenger_count"       => "integer"
            "trip_distance"         => "float"
            "fare_amount"           => "float"
            "extra"                 => "float"
            "mta_tax"               => "float"
            "tip_amount"            => "float"
            "tolls_amount"          => "float"
            "ehail_fee"             => "float"
            "improvement_surcharge" => "float"
            "total_amount"          => "float"
            "payment_type"          => "string"
            "trip_type"             => "integer"
            "pickup"                => "string"
            "dropoff"               => "string"
            "cab_type"              => "string"
            "precipitation"         => "integer"
            "snow_depth"            => "integer"
            "snowfall"              => "integer"
            "max_temperature"       => "integer"
            "min_temperature"       => "integer"
            "average_wind_speed"    => "integer"
            "pickup_nyct2010_gid"   => "integer"
            "pickup_ctlabel"        => "string"
            "pickup_borocode"       => "integer"
            "pickup_boroname"       => "string"
            "pickup_ct2010"         => "string"
            "pickup_boroct2010"     => "string"
            "pickup_cdeligibil"     => "string"
            "pickup_ntacode"        => "string"
            "pickup_ntaname"        => "string"
            "pickup_puma"           => "string"
            "dropoff_nyct2010_gid"  => "integer"
            "dropoff_ctlabel"       => "string"
            "dropoff_borocode"      => "integer"
            "dropoff_boroname"      => "string"
            "dropoff_ct2010"        => "string"
            "dropoff_boroct2010"    => "string"
            "dropoff_cdeligibil"    => "string"
            "dropoff_ntacode"       => "string"
            "dropoff_ntaname"       => "string"
            "dropoff_puma"          => "string"
        }
    }
}

output {
    elasticsearch {
        action => "index"
        hosts => "localhost:9200"
        index => "trips"
        flush_size => 20000
    }
}

The following import took 4 days and 16 hours to complete. This is 1.6x longer than the 70 hours it took to import the 5-field dataset.

$ screen
$ /opt/logstash/bin/logstash -f ~/trips.conf

It Fits On The SSD!

The point of this exercise was to fit the ~500 GB of CSV data into Elasticsearch on a single SSD. In the previous exercise I called the _optimize endpoint a lot to try and save space during importing. I was advised against doing this and I never called it during this import.

When I started the import 17 GB of drive space on the SSD was being used (this included Ubuntu, Elasticsearch, etc...). By the time I had imported 200 million records only 134 GB of space was being used and at 400 million records 263 GB of space was being used. Basically the document count was out-running the disk space usage count. When all 1.1 billion records were imported 705 GB of drive space on the SSD was being used.

Benchmarking Queries in Elasticsearch

The following completed in 34.48 seconds (4.2x slower than in the previous benchmark).

SELECT cab_type,
       count(*)
FROM trips
GROUP BY cab_type

The following completed in 63.3 seconds (3.5x slower than in the previous benchmark).

SELECT passenger_count,
       avg(total_amount)
FROM trips
GROUP BY passenger_count

The following completed in 99.65 seconds (4x slower than in the previous benchmark).

SELECT passenger_count,
       count(*) trips
FROM trips
GROUP BY passenger_count,
         date_histogram(field='pickup_datetime',
                              'interval'='year',
                              'alias'='year')

I'm starting to believe that the best way to store data in Elasticsearch is to be very particular about which fields are stored rather than treating it as a dumping ground. Sharding by columns (if that's an option) and definitely by key will be a huge help as imports take a long time compared to every other data store I've used in an OLAP capacity.

That doesn't mean that Elasticsearch is slow or incapable. In fact, it supports queries that would be next to impossible to run on other systems and it's response times under load from a large number of concurrent users is impressive to say the least.

Thank you for taking the time to read this post. I offer both consulting and hands-on development services to clients in North America and Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2024 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.