In Zeppelin, you can import the NYC TAXI Benchmark file with below link.

NYC_TAXI_BM_load_and_query.json

1. Put csv files into HDFS

%sh
hdfs dfs -mkdir /nyc_taxi

hdfs dfs -mkdir /nyc_taxi/csv

hdfs dfs -put /nvme_ssd/nyc_taxi/csv_gz/csv1/* /nyc_taxi/csv
hdfs dfs -put /nvme_ssd/nyc_taxi/csv_gz/csv2/* /nyc_taxi/csv
hdfs dfs -put /nvme_ssd/nyc_taxi/csv_gz/csv3/* /nyc_taxi/csv
hdfs dfs -put /nvme_ssd/nyc_taxi/csv_gz/csv4/* /nyc_taxi/csv
hdfs dfs -put /nvme_ssd/nyc_taxi/csv_gz/csv5/* /nyc_taxi/csv
hdfs dfs -put /nvme_ssd/nyc_taxi/csv_gz/csv6/* /nyc_taxi/csv

2. Create dataframe and load data

%spark

import org.apache.spark.sql.types._

val taxiSchema = StructType(Array(
        StructField("trip_id", IntegerType, true),
        StructField("vendor_id", StringType, true),
        StructField("pickup_datetime", TimestampType, true),
        StructField("dropoff_datetime", TimestampType, true),
        StructField("store_and_fwd_flag", StringType, true),
        StructField("rate_code_id", IntegerType, true),
        StructField("pickup_longitude", DoubleType, true),
        StructField("pickup_latitude", DoubleType, true),
        StructField("dropoff_longitude", DoubleType, true),
        StructField("dropoff_latitude", DoubleType, true),
        StructField("passenger_count", StringType, true),
        StructField("trip_distance", DoubleType, true),
        StructField("fare_amount", DoubleType, true),
        StructField("extra", DoubleType, true),
        StructField("mta_tax", DoubleType, true),
        StructField("tip_amount", DoubleType, true),
        StructField("tolls_amount", DoubleType, true),
        StructField("improvement_surcharge", DoubleType, true),
        StructField("total_amount", DoubleType, true),
        StructField("payment_type", StringType, true),
        StructField("trip_type", IntegerType, true),
        StructField("cab_type", StringType, true),
        StructField("precipitation", DoubleType, true),
        StructField("snow_depth", DoubleType, true),
        StructField("snowfall", DoubleType, true),
        StructField("max_temperature", IntegerType, true),
        StructField("min_temperature", IntegerType, true),
        StructField("average_wind_speed", DoubleType, true),
        StructField("pickup_nyct2010_gid", IntegerType, true),
        StructField("pickup_ctlabel", StringType, true),
        StructField("pickup_borocode", IntegerType, true),
        StructField("pickup_boroname", StringType, true),
        StructField("pickup_ct2010", StringType, true),
        StructField("pickup_boroct2010", StringType, true),
        StructField("pickup_cdeligibil", StringType, true),
        StructField("pickup_ntacode", StringType, true),
        StructField("pickup_ntaname", StringType, true),
        StructField("pickup_puma", StringType, true),
        StructField("dropoff_nyct2010_gid", IntegerType, true),
        StructField("dropoff_ctlabel", StringType, true),
        StructField("dropoff_borocode", IntegerType, true),
        StructField("dropoff_boroname", StringType, true),
        StructField("dropoff_ct2010", IntegerType, true),
        StructField("dropoff_boroct2010", StringType, true),
        StructField("dropoff_cdeligibil", StringType, true),
        StructField("dropoff_ntacode", StringType, true),
        StructField("dropoff_ntaname", StringType, true),
        StructField("dropoff_puma", StringType, true)
    ))

    val taxiDF = spark.read.format("csv")
                .option("header", "false")
                .option("delimiter", ",")
                .option("mode", "FAILFAST")
                .schema(taxiSchema)
                .load("/nyc_taxi/csv/*.csv.gz")

4. Create temp view for the dataframe

%spark
taxiDF.createOrReplaceTempView("trips")

5. Transform the dataframe for Lightning DB

%spark
import org.apache.spark.sql.functions._
val deltaDf = taxiDF
    .filter($"pickup_datetime".isNotNull && $"passenger_count".isNotNull && $"cab_type".isNotNull)
    .withColumn("pickup_yyyyMMddhh", from_unixtime(unix_timestamp($"pickup_datetime"),  "yyyyMMddhh"))
    .withColumn("round_trip_distance", round($"trip_distance"))

deltaDf.printSchema()

6. Create temp view for Lightning DB with r2 options those support Lightning DB as the data source

%spark
val r2Options = Map[String, String]("table" -> "100",
      "host" -> "192.168.111.35",
      "port" -> "18800",
      "partitions" -> "pickup_yyyyMMddhh passenger_count cab_type",
      "mode" -> "nvkvs",
      "rowstore" -> "false",
      "group_size" -> "40",
      "at_least_one_partition_enabled" -> "no")
spark.sqlContext.read.format("r2").schema(deltaDf.schema).options(r2Options).load().createOrReplaceTempView("fb_trips")

7. Load data from the dataframe into Lightning DB

%spark
deltaDf.write
    .format("r2")
    .insertInto("fb_trips")

8. Enable ‘aggregation pushdown’ feature

SET spark.r2.aggregation.pushdown=true

9. Do ‘NYC TAXI Benchmark’

Q1

%sql
SELECT cab_type, count(*) FROM fb_trips GROUP BY cab_type

Q2

%sql
SELECT passenger_count,
       avg(total_amount)
FROM fb_trips
GROUP BY passenger_count

Q3

%sql
SELECT passenger_count,
       substring(pickup_yyyyMMddhh, 1, 4),
       count(*)
FROM fb_trips
GROUP BY passenger_count, 
         substring(pickup_yyyyMMddhh, 1, 4)

Q4

%sql
SELECT passenger_count,
       substring(pickup_yyyyMMddhh, 1, 4),
       round_trip_distance,
       count(*)
FROM fb_trips
GROUP BY 1,
         2,
         3
ORDER BY 2,
         4 desc