Parallel Processing Zip Archive CSV Files With Python and Pandas

In this post, we'll show how to read multiple CSV files in parallel with Python and Pandas. The files will be read into temporary DataFrames and loaded into a single DataFrame.

Another example of parallisation is available here: Pandas Easy Parallelization with df.iterrows() or For Loop

You can see the full code bellow and all the steps with explanation:

from multiprocessing import Pool
from zipfile import ZipFile

import pandas as pd

import tarfile

    
def process_archive(csv_file):
    try:
        df_temp = pd.read_csv(zip_file)
        df_temp.to_csv('data/all_filses.csv', mode='a', header=False)
    except:
        print(csv_file + '\n')
    
    
zip_file = 'data/41.zip'
    

zip_file = ZipFile(zip_file)

zip_files = {text_file.filename    for text_file in zip_file.infolist()       if text_file.filename.endswith('.csv')}

p = Pool(12)
p.map(process_archive, zip_files)

Step 1: Read all files from the

First we are going to read all file names from the zip file in the iterator. Only .csv files will be read from the archive file:

zip_file = ZipFile(zip_file)

zip_files = {text_file.filename    for text_file in zip_file.infolist()       if text_file.filename.endswith('.csv')}

results in:

  • 41/file1.csv
  • 41/file2.csv

Note: if you work with tar.gz file than you need a change in the reading the archive and processing it:

zip_file = tarfile.open(zip_file, "r:gz")

Step 2: Read the archived files with method

We need a method which is going to be used for parallel execution. It will read the CSV files and write them to new CSV file:

def process_archive(csv_file):
    try:
        df_temp = pd.read_csv(zip_file)
        df_temp.to_csv('data/all_filses.csv', mode='a', header=False)
    except:
        print(csv_file + '\n')

I've noticed that for huge amounts of small files - i.e. 100 000+ CSV files and a high number of parallel processes - then errors are raised.

Note: The arguments - mode='a', header=False ensure that we are in appending mode and headers will be skipped.

Step 3: Process multiple CSV files in parallel

Finally we are going to perform the parallel processing. So we will pass the iterator from step 1 to the method defined in step 2. At this step we are defining the number of the parallel processes

p = Pool(12)
p.map(process_archive, zip_files)

Conclusion

The parallel processing of the CSV files speeds up the processing of the files. Another benefit of this technique is that disk space is saved.

Share Tweet Send
0 Comments
Loading...