Home | Benchmarks | Categories | Atom Feed

Posted on Sun 17 September 2017 under Databases

1.1 Billion Taxi Rides with Spark 2.2 & 3 Raspberry Pi 3 Model Bs

The Raspberry Pi is a £29, UK-built, single-board computer. To date more than 12.5 million units have been sold. In this benchmark I'll use three Raspberry Pis, a few Micro SD cards and an old 7200 RPM hard drive and see what sort of query performance Spark 2.2 can achieve on a cluster of these devices.

The dataset I'll be using is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put the dataset together. This is the same dataset I've used to benchmark Amazon Athena, BigQuery, BrytlytDB, ClickHouse, Elasticsearch, EMR, kdb+/q, MapD, PostgreSQL, Redshift and Vertica. I have a single-page summary of all these benchmarks for comparison.

The Pi's Performance

The model of the device I'm using in this blog, 3 Model B Rev 1.2, comes with a 4-core 64-bit 1.2 GHz ARMv7 CPU, 1 GB of memory, a built-in SD Card reader that the device can boot off of and a wireless adaptor. The device can be run in headless mode where instructions on setting up Wi-Fi and SSH access can be given by files placed on the Micro SD card the device is booted from.

The CPU and memory don't tell the full picture of how fast the system can interact with storage and networking. To start, the USB ports, which are handy for plugging in additional storage, share a bus, and its bandwidth, with the Wi-Fi adaptor.

$ lsusb -t
/:  Bus 01.Port 1: Dev 1, Class=root_hub, Driver=dwc_otg/1p, 480M
    |__ Port 1: Dev 2, If 0, Class=Hub, Driver=hub/5p, 480M
        |__ Port 1: Dev 3, If 0, Class=Vendor Specific Class, Driver=smsc95xx, 480M

The USB ports support interface version 2.0 which has a max throughput of ~53 MB/s. I plugged a 2.5" Western Digital Black 500GB 7200 RPM disk drive in via a SATA 3 to USB converter and found I could reach a little shy of 47 MB/s.

$ sudo dd if=/dev/zero of=/mnt/usb/test bs=500K count=1024
524288000 bytes (524 MB, 500 MiB) copied, 11.2016 s, 46.8 MB/s

The various SD Micro cards I used for boot and storage drives have been benchmarked to each speeds of up to 78 MB/s but when I benchmarked them plugged into the built-in card reader on the Raspberry Pi I couldn't write much past the 10 MB/s mark and reads were under 25 MB/s for the most part.

$ dd if=/dev/zero of=~/test bs=500K count=1024
524288000 bytes (524 MB, 500 MiB) copied, 62.7941 s, 8.3 MB/s
$ sync
$ echo 3 | sudo tee /proc/sys/vm/drop_caches
$ dd if=~/test of=/dev/null bs=500K count=1024
524288000 bytes (524 MB, 500 MiB) copied, 23.6529 s, 22.2 MB/s

The good thing about the Hadoop ecosystem is that I can horizontally scale workloads amongst many machines and jobs will process at the aggregated rate of each the machines in the cluster. Adding more Raspberry Pis and storage devices to a cluster should speed up query times.

A Raspberry Pi Cluster

I will be using three Raspberry Pi 3 Model Bs in this benchmark. They're named r1, r2 and r3.

The r1 device has a 128 GB Sandisk MicroSD card that it'll use for the OS, applications and HDFS storage. I found I couldn't reach more than 3.2 MB/s when transferring data via SSH to the Raspberry Pi so I've attached a 2.5" Western Digital Black 500GB 7200 RPM disk drive in via a SATA 3 to USB converter that holds a copy of the taxi trips dataset.

The r2 device has a 32 GB Micro SD card plugged into the on-board slot and another 64 GB Sandisk Micro SD card plugged in via a USB adaptor. The dataset is 86 GB in ORC format and each Raspberry Pi will have their own copy of the dataset to improve data locality when querying data. The data is stored on an HDFS cluster spread across the three Raspberry Pis and HDFS supports using more than one storage device on each machine. Using both the 32 GB and the 64 GB Micro SD cards means 96 GB of pre-formatted capacity will be available for the OS, Hadoop and HDFS storage which should be just enough for everything to fit.

The r3 device also has a 32 GB Micro SD card plugged into the on-board slot and another 64 GB Sandisk Micro SD card plugged in via a USB adaptor.

I used a USB-powered fan to cool the machines. I've posted a photo of the cluster on Twitter.

Bootstrapping the Pis

On each Micro SD card I use to boot each of the Raspberry Pis I've used Etcher to write an image of the August 16th release of Raspbian Stretch Lite, a minimalist Linux distribution for the Raspberry Pi which is based on Debian Stretch. The image is 348 MB compressed and decompresses into a 1.8 GB image. Etcher should work well on Windows, Linux and MacOS.

Once the image is on each of the MicroSD cards I enable SSH access by creating an empty ssh file on the boot partition.

$ touch /Volumes/boot/ssh

I also include Wi-Fi connection details which will be picked up and used during boot-up.

$ vi /Volumes/boot/wpa_supplicant.conf
ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
update_config=1

network={
        ssid="<network name>"
        psk="<password>"
}

About 6 seconds after I plugged the power cable into each Raspberry Pi I could see devices beginning with the Ethernet mac address prefix of B8:27:EB appear in my Router's ARP table. This prefix is owned by the Raspberry Pi Foundation which makes it easy to distinguish from other devices on the network. The ARP listing is how I found the IP addresses of the Raspberry Pis.

By default the username to SSH in is pi and the password is raspberry. When you SSH in you'll be given the following message:

SSH is enabled and the default password for the 'pi' user has not been changed.
This is a security risk - please login as the 'pi' user and type 'passwd' to set a new password.

This will cause issues with rsync where you'll get errors like the following:

protocol version mismatch -- is your shell clean?
(see the rsync man page for an explanation)
rsync error: protocol incompatibility (code 2) at compat.c(178) [sender=3.1.2]

So if you don't change the default password you'll need to remove the warning script to stop that message disturbing rsync.

sudo rm /etc/profile.d/sshpwd.sh

Installing HDFS, Hive & Spark

This is the list of prerequisite packages I installed. iotop and nethogs are for telemetry and are optional. mysql-server is only used on r1 and you'll save yourself some memory and CPU cycles by not installing it on r2 and r3. The Micro SD cards and the 500 GB disk drive plugged in via the USB ports are formatted with the exfat file system which isn't supported out of the box with Raspbian so exfat-fuse and exfat-utils are needed in order to interact with them.

sudo apt update
sudo apt install \
    exfat-fuse \
    exfat-utils \
    iotop \
    mysql-server \
    nethogs \
    oracle-java8-jdk

On r1 I'll create a user for Hive in MySQL / MariaDB.

$ sudo su
$ mysql -uroot
CREATE USER 'hive'@'localhost' IDENTIFIED BY 'hive';
GRANT ALL PRIVILEGES ON *.* TO 'hive'@'localhost';
FLUSH PRIVILEGES;
$ exit

For various reasons Hadoop needs to refer to other nodes in the cluster by hostname so I'll add them to the hosts file on all three devices.

$ sudo vi /etc/hosts
192.168.0.16 r1
192.168.0.22 r2
192.168.0.25 r3

To ease memory pressure I'll expand the 100 MB SWAP file to 2,000 MB by changing the CONF_SWAPSIZE setting in /etc/dphys-swapfile on all three devices as well.

$ sudo vi /etc/dphys-swapfile
CONF_SWAPSIZE=2000

I'll then restart each of the devices so they'll pick up that SWAP file change.

$ sudo reboot

By default Hadoop uses the root account to SSH onto each of the nodes in the cluster. I'll create SSH keys to make sure this is a password-less process. On r1 I'll generate a new key pair and add it to the authorized keys of r1's root account.

$ sudo su
$ ssh-keygen
$ cp /root/.ssh/id_rsa.pub \
     /root/.ssh/authorized_keys

I don't have an SSH password for the root account on r2 or r3 so I'll copy it to the authorized_keys file for the pi user on those devices.

$ ssh-copy-id pi@r2
$ ssh-copy-id pi@r3
$ exit

Then on r2 and r3 I'll bootstrap the .ssh folder for the root accounts on those machines and copy the authorized_keys file from the pi user's .ssh folder so the root user can accept it as well.

$ sudo su
$ ssh-keygen
$ cp /home/pi/.ssh/authorized_keys \
     /root/.ssh/authorized_keys
$ exit

There are settings that will be used by HDFS, Hive and Spark and by both the root and the pi user accounts. To centralise these settings I've stored them in /etc/profile and created a symbolic link from /root/.bashrc to this file as well. That way all users will have the same settings and they can be centrally managed on each device.

$ sudo vi /etc/profile
if [ "${PS1-}" ]; then
  if [ "${BASH-}" ] && [ "$BASH" != "/bin/sh" ]; then
    # The file bash.bashrc already sets the default PS1.
    # PS1='\h:\w\$ '
    if [ -f /etc/bash.bashrc ]; then
      . /etc/bash.bashrc
    fi
  else
    if [ "`id -u`" -eq 0 ]; then
      PS1='# '
    else
      PS1='$ '
    fi
  fi
fi

if [ -d /etc/profile.d ]; then
  for i in /etc/profile.d/*.sh; do
    if [ -r $i ]; then
      . $i
    fi
  done
  unset i
fi

export HADOOP_HOME=/opt/hadoop
export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:/opt/hive/bin:/opt/spark/bin
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"
export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop
export SPARK_HOME=/opt/spark
export SPARK_CONF_DIR=/opt/spark/conf
export SPARK_MASTER_HOST=r1
export JAVA_HOME=/usr/lib/jvm/jdk-8-oracle-arm32-vfp-hflt/jre
$ sudo ln -sf /etc/profile \
              /root/.bashrc
$ source /etc/profile

On r1 I'll create the folders used by the various Hadoop tools used in this benchmark.

$ sudo mkdir -p /opt/{hadoop,hdfs/{datanode,namenode},hive,spark}

The 500 GB hard drive is connected and represented by /dev/sda1. I'll mount it to /mnt/usb.

$ sudo mkdir -p /mnt/usb
$ sudo mount /dev/sda1 /mnt/usb

On r2 and r3 I'll mount the Micro SD cards that are plugged in via USB adaptors (not the built-in adaptor, that card is already mounted).

$ sudo mkdir -p /mnt/usb
$ sudo mount /dev/sda1 /mnt/usb

I'll then create the application folders and the two data node folders HDFS will use for heterogeneous storage.

$ sudo mkdir -p /opt/{hadoop,hdfs/datanode,spark},/mnt/usb/hdfs/datanode

Next I'll download the three Hadoop software packages onto r1.

$ DIST=http://www-eu.apache.org/dist
$ wget -c -O hadoop.tar.gz $DIST/hadoop/common/hadoop-2.8.1/hadoop-2.8.1.tar.gz
$ wget -c -O hive.tar.gz   $DIST/hive/hive-2.3.0/apache-hive-2.3.0-bin.tar.gz
$ wget -c -O spark.tgz     $DIST/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz

Hadoop is 405 MB in size when compressed, Hive is 221 MB and Spark is 194 MB. Hadoop expands to 2.1 GB but 1.9 GB of that is documentation so I'll exclude the docs from the extraction.

$ sudo tar xvf hadoop.tar.gz \
      --directory=/opt/hadoop \
      --exclude=hadoop-2.8.1/share/doc \
      --strip 1

Hive is 172 MB decompressed but 102 MB of that is unit tests so I'll exclude those from extraction.

$ sudo tar xvf hive.tar.gz \
      --directory=/opt/hive \
      --exclude=apache-hive-2.3.0-bin/ql/src/test \
      --strip 1

The following will extract Spark to its installation folder.

$ sudo tar xzvf spark.tgz \
    --directory=/opt/spark \
    --strip 1

I'll specify the master and slaves for the HDFS cluster. r1 will serve as both a master and a slave so that all the Raspberry Pis will be busy when processing workloads.

$ sudo vi /opt/hadoop/etc/hadoop/master
r1
$ sudo vi /opt/hadoop/etc/hadoop/slaves
r1
r2
r3

I'll then create two files with configuration overrides needed for this HDFS cluster. I'll be setting a default replication factor of 3 for all the files stored on HDFS so that they're copied onto each machine in full. There are multiple storage folders used on r2 and r3 and to avoid filling the Micro SD card used by both HDFS and the OS I've set a limit of 3 GB that must be available before HDFS writes any blocks to a partition.

$ sudo vi /opt/hadoop/etc/hadoop/core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>fs.default.name</name>
        <value>hdfs://r1:9000/</value>
    </property>
    <property>
        <name>fs.default.FS</name>
        <value>hdfs://r1:9000/</value>
    </property>
</configuration>
$ sudo vi /opt/hadoop/etc/hadoop/hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>/opt/hdfs/datanode</value>
        <final>true</final>
    </property>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>/opt/hdfs/namenode</value>
        <final>true</final>
    </property>
    <property>
        <name>dfs.namenode.http-address</name>
        <value>r1:50070</value>
    </property>
    <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>
    <property>
        <name>dfs.datanode.du.reserved</name>
        <value>3221225472</value>
    </property>
</configuration>

I'll then sync Hadoop's binaries and configuration onto the other two Raspberry Pis.

$ for SERVER in r2 r3
  do
      sudo rsync --archive \
                 --one-file-system \
                 --partial \
                 --progress \
                 --compress \
                 /opt/hadoop/ $SERVER:/opt/hadoop/
  done

On r2 and r3 I'll adjust the HDFS configuration to include both storage folders.

$ sudo vi /opt/hadoop/etc/hadoop/hdfs-site.xml
<property>
    <name>dfs.datanode.data.dir</name>
    <value>/mnt/usb/hdfs/datanode,/opt/hdfs/datanode</value>
    <final>true</final>
</property>

At this point I'll need to load an interactive root shell in order to run three commands.

$ sudo su

The first command will format the HDFS name node.

$ hdfs namenode -format

The next will launch HDFS across the whole cluster. This command will SSH as the root user into each device.

$ start-dfs.sh

The third command sets permissive access for the pi user on HDFS.

$ hdfs dfs -chown pi /

Once that's all done I can check the capacity available across the cluster. The first line of output is the aggregate of each of the devices. The remaining three lines are the amount of capacity on each respective device.

$ hdfs dfsadmin -report | grep 'Configured Capacity'
Configured Capacity: 314337058816 (292.75 GB)
Configured Capacity: 125850886144 (117.21 GB)
Configured Capacity: 94243086336 (87.77 GB)
Configured Capacity: 94243086336 (87.77 GB)

The dataset is 84.5 GB when stored as ORC files so there is just enough space for this data set on each node in the cluster.

$ exit

Hive will represent the ORC files on HDFS as a table that Spark can query using SQL. The following will configure Hive to use MySQL / MariaDB to store its metadata. This only needs to happen on r1.

$ sudo vi /opt/hive/conf/hive-site.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://localhost/metastore?createDatabaseIfNotExist=true</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>hive</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>hive</value>
    </property>
    <property>
        <name>datanucleus.autoCreateSchema</name>
        <value>true</value>
    </property>
    <property>
        <name>datanucleus.fixedDatastore</name>
        <value>true</value>
    </property>
    <property>
        <name>datanucleus.autoCreateTables</name>
        <value>True</value>
    </property>
</configuration>

I'll then download the MySQL / MariaDB connector for Hive to use.

$ sudo wget -c http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.28/mysql-connector-java-5.1.28.jar \
    -P /opt/hive/lib/

I'll then initialise the schema and launch the Hive Metastore.

$ schematool -dbType mysql -initSchema
$ hive --service metastore &

Spark will need to know of Hive's configuration settings so I'll link the configuration file into Spark's configuration folder.

$ sudo ln -s /opt/hive/conf/hive-site.xml \
             /opt/spark/conf/hive-site.xml

Spark too will also need to use the same MySQL / MariaDB connector.

$ sudo ln -s /opt/hive/lib/mysql-connector-java-5.1.28.jar \
             /opt/spark/jars/mysql-connector-java-5.1.28.jar

When you launch pyspark, spark-submit or spark-sql the Spark libraries from the master node are copied onto HDFS and shared amongst the worker nodes. Reading 200 MB off of the Micro SD card every time one of these applications launches adds a lot of delay so I'll package up these libraries, upload them to HDFS and in the Spark configuration I'll make sure the cached jar of libraries is used instead.

$ jar cv0f ~/spark-libs.jar -C /opt/spark/jars/ .
$ hdfs dfs -mkdir /spark-libs
$ hdfs dfs -put ~/spark-libs.jar /spark-libs/
$ sudo vi /opt/spark/conf/spark-defaults.conf
spark.master spark://r1:7077
spark.yarn.preserve.staging.files true
spark.yarn.archive hdfs:///spark-libs/spark-libs.jar

I found a 650 MB memory limit on the various Spark components allowed everything to work without complaining.

$ sudo vi /opt/spark/conf/spark-env.sh
SPARK_EXECUTOR_MEMORY=650m
SPARK_DRIVER_MEMORY=650m
SPARK_WORKER_MEMORY=650m
SPARK_DAEMON_MEMORY=650m

Spark jobs will run on all three Raspberry Pis.

$ sudo vi /opt/spark/conf/slaves
r1
r2
r3

With that done I'll distribute Spark and its configuration to the other nodes.

$ for SERVER in r2 r3
  do
      sudo rsync --archive \
                 --one-file-system \
                 --partial \
                 --progress \
                 --compress \
                 --exclude /opt/spark/logs \
                 /opt/spark/ $SERVER:/opt/spark/
  done

To save memory I didn't launch Spark until after I have populated all the data onto HDFS but it makes sense to mention the launch commands here. They are as follows:

$ sudo /opt/spark/sbin/start-master.sh
$ sudo /opt/spark/sbin/start-slaves.sh

Loading 1.1 Billion Trips onto HDFS

The original dataset of 1.1 billion records is ~500 GB in uncompressed CSV format. I compressed those files into 56 gzip files which take up around 104 GB of space. Spark queries ORC-formatted data very well and ORC uses zlib to compress data by column bringing the dataset down to 84.5 GB. For the task of converting the dataset into ORC I followed the instructions from my 1.1 Billion Taxi Rides on AWS EMR 5.3.0 & Spark 2.1.0 benchmark. AWS EMR saved the ORC files onto S3 and I then downloaded them onto the 500 GB hard drive.

The Wi-Fi speeds on the Raspberry Pis were never as quick as connecting storage to the devices. HDFS still needs to replicate data via Wi-Fi but I could at least get the data onto r1 quickly and then let the fault-tolerant process of replication take its time.

The following mounted the 500 GB drive onto r1 and copied the data stored in the orcs folder onto HDFS.

$ sudo mkdir -p /mnt/usb
$ sudo mount /dev/sda1 /mnt/usb
$ hdfs dfs -copyFromLocal /mnt/usb/orcs/* /trips/

The data took around 4 hours to copy onto r1. During that time r1 was replicating the data onto r2 and r3. The replication between machines never broke past 1.1 MB/s. There were a few occasions where one of the Raspberry Pis would lose connectivity and I'd have to restart the device and then the HDFS cluster again to continue the replication process. I spent 1.5 days watching the replication process.

At one point I could see the primary drives on two of the Raspberry Pis had been completely filled. The 3 GB minimum reserve of free space limit I had set looks to have been ignored. The following is the disk space reports from all three Raspberry Pis.

Filesystem 1K-blocks Used Available Use% Mounted on
/dev/root  118G      91G  22G       81%  /
Filesystem 1K-blocks Used Available Use% Mounted on
/dev/root  29G       28G  0         100% /
/dev/sda1  60G       57G  3.1G       95% /mnt/usb
Filesystem 1K-blocks Used Available Use% Mounted on
/dev/root  29G       28G  8.4M      100% /
/dev/sda1  60G       57G  3.1G       95% /mnt/usb

I checked the /trips/ folder on HDFS and it said all but 53 blocks of the total 704 had been fully replicated. This means when I'm querying the full dataset devices may need to ask one another for parts of the dataset. I wasn't sure if I could get the remaining 53 blocks to replicate properly so I accepted this and pushed on with the benchmark.

$ hdfs fsck /trips/
...
......Status: HEALTHY
 Total size:    90808380700 B
 Total dirs:    1
 Total files:   56
 Total symlinks:                0
 Total blocks (validated):      704 (avg. block size 128989177 B)
 Minimally replicated blocks:   704 (100.0 %)
 Over-replicated blocks:        0 (0.0 %)
 Under-replicated blocks:       53 (7.528409 %)
 Mis-replicated blocks:         0 (0.0 %)
 Default replication factor:    3
 Average block replication:     2.8551137
 Corrupt blocks:                0
 Missing replicas:              102 (4.8295455 %)
 Number of data-nodes:          3
 Number of racks:               1
FSCK ended at Sun Sep 17 12:24:03 UTC 2017 in 45 milliseconds


The filesystem under path '/trips' is HEALTHY

With the data on HDFS, albeit not completely replicated, I created the database table that would represent the ORC files to Spark.

$ hive
CREATE EXTERNAL TABLE trips_orc (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION '/trips/';

Benchmarking Spark

The times quoted below are the lowest query times seen during a series of runs. As with all my benchmarks, I use the lowest query time as a way of indicating "top speed".

$ spark-sql \
    --master spark://r1:7077 \
    --num-executors 3

The following completed in 18 minutes and 23.73 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc
GROUP BY cab_type;

The following completed in 19 minutes and 58.59 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

The following completed in 37 minutes and 58.121 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 1 hour, 47 minutes and 25.98 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

Telemetry from the Benchmark

I captured a few pieces of telemetry while the queries were running that I found interesting.

The load averages across the cluster show the CPUs were very busy and most likely a bottleneck through much of the workloads.

3.78, 1.89, 0.97
3.91, 1.50, 0.59
4.02, 1.53, 0.61

For the most part there was near-radio silence between the devices. Interconnectivity wasn't the bottleneck I feared the lack of replication would make it.

PID   USER  PROGRAM                DEV    SENT   RECEIVED
5574  pi    ..sr/lib/jvm/jdk-8-or  wlan0  4.886   6.446 KB/sec
25771 root  ..sr/lib/jvm/jdk-8-or  wlan0  4.927  10.085 KB/sec
25066 root  ..sr/lib/jvm/jdk-8-or  wlan0  2.890   9.026 KB/sec

Memory pressures were there but it wasn't as bad on r2 and r3 as it was on r1.

KiB Mem :   949572 total,    31156 free,   719220 used,   199196 buff/cache
KiB Swap:  2047996 total,  1370452 free,   677544 used.   178268 avail Mem
KiB Mem :   949572 total,    29236 free,   455132 used,   465204 buff/cache
KiB Swap:  2047996 total,  1983956 free,    64040 used.   426000 avail Mem
KiB Mem :   949572 total,    28772 free,   465396 used,   455404 buff/cache
KiB Swap:  2047996 total,  1998696 free,    49300 used.   415204 avail Mem

Between reading and decompressing ORC files lies the main bottlenecks. Still, I'm impressed these cheap machines could do the job.

Total DISK READ :       5.87 M/s | Total DISK WRITE :       0.00 B/s
Actual DISK READ:      12.22 M/s | Actual DISK WRITE:    1551.84 K/s
  TID  PRIO  USER    DISK READ>  DISK WRITE  SWAPIN      IO    COMMAND
 5590 be/4 pi          433.00 M    100.00 K 12.19 %  0.00 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~r.SparkSQLCLIDriver --num-executors 3 spark-internal
 5757 be/4 pi           32.13 M      4.00 K  1.07 %  0.00 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~r.SparkSQLCLIDriver --num-executors 3 spark-internal
 5594 be/4 pi           26.69 M     12.00 K  1.59 %  0.01 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~r.SparkSQLCLIDriver --num-executors 3 spark-internal
 5761 be/4 pi           21.22 M      0.00 B  1.20 %  0.00 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~r.SparkSQLCLIDriver --num-executors 3 spark-internal
 5755 be/4 pi           19.81 M      0.00 B  1.03 %  0.00 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~r.SparkSQLCLIDriver --num-executors 3 spark-internal
 5679 be/4 pi           18.87 M      0.00 B  0.69 %  0.01 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~r.SparkSQLCLIDriver --num-executors 3 spark-internal
 5708 be/4 pi           16.49 M      0.00 B  0.44 %  0.01 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~r.SparkSQLCLIDriver --num-executors 3 spark-internal
 5677 be/4 pi           16.01 M      0.00 B  0.67 %  0.04 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~r.SparkSQLCLIDriver --num-executors 3 spark-internal
Total DISK READ :      18.41 M/s | Total DISK WRITE :       0.00 B/s
Actual DISK READ:      11.65 M/s | Actual DISK WRITE:     713.97 K/s
  TID  PRIO  USER    DISK READ>  DISK WRITE  SWAPIN      IO    COMMAND
  606 be/4 root          5.05 G    524.00 K  0.00 % 26.04 % mount.exfat /dev/sda1 /mnt/usb -o rw,nonempty
25774 be/4 root        148.02 M     96.00 K  3.17 %  0.77 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~-0000 --worker-url spark://Worker@192.168.0.22:43827
 1231 be/4 root          6.48 M      0.00 B  0.00 % 24.02 % java -Dproc_datanode -Xmx1000m -Djava.library.path=~RFAS org.apache.hadoop.hdfs.server.datanode.DataNode
25833 be/4 root          5.93 M      0.00 B  0.04 %  0.01 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~-0000 --worker-url spark://Worker@192.168.0.22:43827
25614 be/4 root          5.63 M     24.00 K  0.21 %  0.00 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~ploy.worker.Worker --webui-port 8081 spark://r1:7077
25779 be/4 root          5.25 M     24.00 K  4.06 %  0.61 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~-0000 --worker-url spark://Worker@192.168.0.22:43827
25618 be/4 root          4.75 M      0.00 B  1.71 %  0.02 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~ploy.worker.Worker --webui-port 8081 spark://r1:7077
Total DISK READ :      31.17 M/s | Total DISK WRITE :      27.10 K/s
Actual DISK READ:      25.37 M/s | Actual DISK WRITE:     304.85 K/s
  TID  PRIO  USER    DISK READ>  DISK WRITE  SWAPIN      IO    COMMAND
  563 be/4 root          5.80 G    564.00 K  0.00 % 17.56 % mount.exfat /dev/sda1 /mnt/usb -o rw,nonempty
25069 be/4 root        200.09 M    132.00 K  4.73 %  2.72 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~-0000 --worker-url spark://Worker@192.168.0.25:44851
25238 be/4 root          7.37 M    396.00 K  0.21 %  0.88 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~-0000 --worker-url spark://Worker@192.168.0.25:44851
  980 be/4 root          6.93 M      0.00 B  0.00 % 31.54 % java -Dproc_datanode -Xmx1000m -Djava.library.path=~RFAS org.apache.hadoop.hdfs.server.datanode.DataNode
24914 be/4 root          6.26 M      4.00 K  1.95 %  0.15 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~ploy.worker.Worker --webui-port 8081 spark://r1:7077
25078 be/4 root          5.35 M     24.00 K  2.69 %  1.48 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~-0000 --worker-url spark://Worker@192.168.0.25:44851
24908 be/4 root          5.00 M     24.00 K  1.67 %  0.07 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~ploy.worker.Worker --webui-port 8081 spark://r1:7077
25127 be/4 root          4.52 M      0.00 B  0.11 %  0.01 % java -cp /opt/spark/conf/:/opt/spark/jars/*:/opt/ha~-0000 --worker-url spark://Worker@192.168.0.25:44851

Closing Thoughts

I spend 1-2 weeks a month living out of a suitcase so carrying a few motherboards and power supplies around isn't practical. When I started this project I had three Raspberry Pis shipped overnight to an Amazon drop off point next to one of my client's offices. I was able to flash the Micro SD cards off a MacBook Pro I take with me when working abroad. I got the devices to connect to a Wi-Fi hotspot I ran off of my Samsung Galaxy S8 phone in my hotel room within an hour of unpacking everything. This all felt like a very convenient and portable way to explore both these small devices and revisit Hadoop on a minimalist hardware setup.

I'll admit that the slow network connectivity and slow I/O had me making many diversions along this journey. I'm not sure if most people interested in learning about Hadoop will have the patience to deal with these limitations. If you want a more practical learning experience I'd suggest trying out Amazon EMR as so much is already setup when you launch a cluster. If you do want to use your own hardware at home I'd suggest anything from the "Modest" tier upward on Logical Increments as a shopping list of parts. You should use multiple computers as distributing workloads and horizontal scalability are Hadoop's main selling points. Make sure to find a motherboard with built-in HDMI so you can save money on graphics cards.

Thank you for taking the time to read this post. I offer both consulting and hands-on development services to clients in North America and Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2024 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.