When dealing with massive datasets, traditional methods of data processing can be slow and inefficient. Fortunately, Apache Spark provides a powerful framework for distributed processing that allows for much faster and more scalable data manipulation.

In this blog post, we will explore how to use Apache Spark to import a massive CSV/JSON (as in many gigabytes and tens of millions of lines), do some data processing and cleaning of the text-based data, and then save the new rows into a database using Python.

The Problem

Before we dive into the Spark solution, let’s take a look at how you might handle a large CSV file in a PHP Laravel app. Laravel provides a built-in CSV importer that allows you to chunk through the file and process the data. Here’s an example of how you might use Laravel’s chunking method to import a large CSV file:

$reader = Reader::createFromPath($filePath);
$header = $reader->fetchOne();

// Loop through the rows and insert them into the database
$reader->setHeaderOffset(0);
$count = 0;
        
foreach ($reader->getRecords() as $row) {
    // Create a new model instance
    $model = new MyModel;
            
    // Map the fields from the CSV to the model attributes
    $model->fill(array_combine($header, $row));
    $model->save();
            
    $count++;
            
    if ($count % 1000 == 0) {
        $this->info("Imported {$count} rows");
    }
}

There’s a key problem here, which is that we are processing line by line, one at a time. If there’s complex work involved, and each action takes even 1 second to run, we end up with a script that will take around a year to finish.

Spark

Apache Spark is a distributed computing framework that enables users to process large volumes of data quickly and efficiently. At its core, Spark consists of a cluster computing system that allows for distributed processing across a cluster of computers. This means that instead of relying on a single machine to process data, Spark distributes the work across multiple machines, allowing for much faster processing speeds. Spark also provides a variety of APIs that allow users to manipulate data in a distributed manner, making it a powerful tool for handling large datasets.

One of the key advantages of Spark is its ability to perform in-memory processing. Unlike traditional data processing systems, which typically rely on disk-based processing, Spark keeps data in memory whenever possible. This means that once data is loaded into Spark, subsequent operations can be performed much more quickly, as the data is readily available in memory. In addition to its in-memory processing capabilities, Spark also provides a wide range of libraries and APIs for handling different types of data, such as structured data, unstructured data, and machine learning data. These features make Spark a powerful tool for handling big data, and one that is well-suited to a wide range of use cases.

Realistically, Spark really comes into its own when doing genuinely massive tasks, like training large data models. Spark can integrate directly with tools you probably already use like scikit-learn and TensorFlow. But we can also use it for simpler tasks that would otherwise take a long time, and we want to be able to run the task not just across multiple processes but multiple (even hundreds) of servers.

Sample Approach

First, we need to set up our environment. We will assume that we have a Spark cluster already set up, with the appropriate Python packages installed.

Next, we will use the pyspark library to load the CSV file into a Spark DataFrame. The SparkSession object provides a convenient interface for working with Spark, and we can use the read.csv method to read in the CSV file. We can also specify options such as the delimiter and header row.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ImportCSV").getOrCreate()
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)

Once we have loaded the data into a DataFrame, we can perform various data processing and cleaning operations using the pyspark.sql.functions module. For example, we can use the regexp_replace function to remove any unwanted characters or strings from the data.

from pyspark.sql.functions import regexp_replace

df = df.withColumn("column_name", regexp_replace(df["column_name"], "[^a-zA-Z0-9 ]", ""))

After we have performed all necessary data processing, we can then save the DataFrame to our database using the pyspark.sql.DataFrameWriter class. We will need to provide the JDBC URL, username, and password for the database, as well as specify the table name and any other options. There’s of course many ways to do this and many things you could be doing with the data - this is just a very quick example.

url = "jdbc:mysql://localhost:3306/mydatabase"
properties = {"user": "myusername", "password": "mypassword"}
table_name = "mytable"

df.write.jdbc(url=url, table=table_name, mode="append", properties=properties)

By default, Spark will use multiple parallel connections to write the data to the database, which can be useful for improving performance. However, if we want to limit the speed at which the data is written to the database, we can do so by setting the numPartitions option to a lower value.

df.write.jdbc(url=url, table=table_name, mode="append", properties=properties, numPartitions=4)

This will limit the number of parallel processes to four, which can help prevent the database from becoming overwhelmed. You will want to test and adjust this. And if you have a powerful enough database you might not need it at all.

Something that might have taken hours or days (or simply been impossible) to run in a conventional way can be handled impressively quickly. With a powerful enough cluster, even enormous tasks can be run quickly and efficiently.