Venkat Krishnamurthy
Venkat Krishnamurthy

Exploring data with pandas and MapD using Apache Arrow

MapD and Python

At MapD, we've long been big fans of the PyData stack, and are constantly working on ways for our open source GPU-accelerated analytic SQL engine to play nicely with the terrific tools in the most popular stack that supports open data science. We are founding collaborators of GOAI (the GPU Open Analytics Initiative), working with the awesome folks at Anaconda and H2O.ai, and our friends at NVIDIA. In GOAI, we use Apache Arrow to mediate efficient, high-performance data interchange for analytics and AI workflows.

A big reason for doing this is to make MapD itself easily accessible to Python tools. For starters, this means supporting modern Python database interfaces like DBAPI. pymapd (built with help from Continuum) is a pythonic interface to MapD's SQL engine supporting DBAPI 2.0, and it has some extra goodness in being able to use our in-built Arrow support for both data loading and query result output.

About Apache Arrow

Apache Arrow is a project in the Apache data ecosystem that will play a pivotal role in the evolution of in-memory computing. Arrow addresses an emerging problem in building data pipelines: the cost of data exchange between different tools. This is an even bigger issue with in-memory tools and engines, which may be fast on their own internal data representations, but then slow when integrated into an end-to-end workflow. The data interchange latency slows down those pipelines.

A well-known example of this is in the Spark-Python interface. While Spark has a python interface, the data interchange within PySpark is between the JVM-based dataframe implementation in the engine, and the Python data structures was a known source of sub-optimal performance and resource consumption. Here is a great write up by Brian Cutler on how Arrow made a significant jump in efficiency within pyspark.

MapD and Arrow

At MapD, we realize the value of Arrow on multiple fronts, and we are working to integrate it deeply within our own product. First, we are finding our place in data science workflows as a modern open-source SQL engine. Arrow solves precisely the problems we expect to encounter related to data interchange. Second, a natural outcome of being a GPU-native engine means that there is great interest in integrating MapD into Machine Learning where Arrow forms the foundation of the GPU dataframe, which provides a highly performant, low-overhead data interchange mechanism with tools like h2o.ai, TensorFlow, and others.

Getting started with pymapd

Let's explore pymapd a little further! To start with, you'll want to install it, from conda-forge

conda install -c conda-forge pymapd

or with pip

pip install pymapd

If you havent already done so, you'll also need to ensure you have a running mapd server that we'll assume is running at localhost:9091. For instructions, see here on how to get set up.

Setting up a connection
import pandas as pd  
import sys  
from pymapd import connect

con = connect(user="mapd", password="HyperInteractive", host="localhost", dbname="mapd")  
con  
Connection(mapd://mapd:***@localhost:9091/mapd?protocol=binary)
Working with a MapD database

MapD SQL support is outlined in the docs. Here, we'll run through a full life cycle of loading data, querying it and handling output.

For this illustration, we'll use some data of recent interest - this dataset is from the Center for Medicare and Medicaid Services, available online here. This dataset is part of what the CMS terms a 'PUF': a Public Use File containing data about prescription drug claims under the Medicare Part D benefit program, covering prescribers, prescriptions and the drugs themselves. It's a topical dataset because of the recent opioid crisis. Like all good data science people, we want to first understand what the data is about, and how it's structured - so let's go straight to the data dictionary available online as part of the Methodology Document.

NOTE: We'd normally recommend using MapD's Immerse UI to start playing with this whole set, but we'll stay within pymapd to illustrate how you could access the power and performance of MapD within a typical Python database interaction workflow.

We're going to grab the Provider Summary dataset, a 135mb zipped file from here. This has provider summary data, as a record for each registered provider in the USA, along with summary information about them, counts of prescriptions for different drug classes, etc. It's a fairly large file - 125MB compressed, about 339MB uncompressed. We'd usually avoid completely loading this into pandas, but part of this exercise is to see how we can handle messy real-world datasets with the tools normally used in data science, and how MapD fits in seamlessly into that toolset.

First, let's load data into a pandas dataframe, mainly to illustrate how Arrow is used in the data load path as well. Dont forget to modify the file path appropriately below.

prescriber_df = pd.read_csv("data/PartD_Prescriber_PUF_NPI_15.txt", sep='\t', low_memory=False)  
prescriber_df.head()  

Let's profile this a little further

prescriber_df.shape  
(1102268, 84)

So we have about 1.1 million rows and 84 columns. Now, let's finally get to loading this into MapD. We'll first use pymapd's built-in support for pandas dataframe loading, using the create_table call on the connection. Note that for now, this doesn’t actually load the dataframe, it uses pandas metadata to create the table without requiring an explicit DDL statement. Also, one thing to watch out related to working with pandas and pymapd (at least for now) is that we need to handle nulls carefully, so that the MapD handoff works. Since this dataset has NaNs in both numeric and string columns, we'll first take care to swap the nulls with zeros for numeric columns, and the NA sentinel value for string columns.

str_cols = prescriber_df.columns[prescriber_df.dtypes==object]  
prescriber_df[str_cols] = prescriber_df[str_cols].fillna('NA')  
prescriber_df.fillna(0,inplace=True)  
prescriber_df.head()  

Having dealt with NaNs, we'll actually load the data. Here, we'll use pymapd's load_table method, that automatically selects arrow when loading from a pandas dataframe. Let's also time the load operation.

con.execute('drop table if exists cms_prescriber')  
con.create_table("cms_prescriber",prescriber_df, preserve_index=False)  
%time con.load_table("cms_prescriber", prescriber_df, preserve_index=False)
CPU times: user 4.44 s, sys: 2.01 s, total: 6.44 s
Wall time: 18.1 s

Success! We could load the whole table via Arrow in roughly 17 seconds on a dual-core i7 MacBook Pro (remember this is using the Arrow load path for illustrative purposes. MapD's native loader is quite a bit faster, and we're working on further optimizations to the load from Arrow).

Let's see what the table structure looks like

con.get_table_details("cms_prescriber")  
[ColumnDetails(name='npi', type='BIGINT', nullable=True, precision=0, scale=0, comp_param=0),
 ColumnDetails(name='nppes_provider_last_org_name', type='STR', nullable=True, precision=0, scale=0, comp_param=0),
 ColumnDetails(name='nppes_provider_first_name', type='STR', nullable=True, precision=0, scale=0, comp_param=0),
 ColumnDetails(name='nppes_provider_mi', type='STR', nullable=True, precision=0, scale=0, comp_param=0),
 ColumnDetails(name='nppes_credentials', type='STR', nullable=True, precision=0, scale=0, comp_param=0),
 ColumnDetails(name='nppes_provider_gender', type='STR', nullable=True, precision=0, scale=0, comp_param=0),
 ColumnDetails(name='nppes_entity_code', type='STR', nullable=True, precision=0, scale=0, comp_param=0),
 ColumnDetails(name='nppes_provider_street1', type='STR', nullable=True, precision=0, scale=0, comp_param=0),
 ...(many more column details elided for readability)

Well, 84 columns - a big gnarly table for sure, but the cool part is, we got away without having to extract an explicit DDL statement, by letting pandas do that for us. Again - this isn’t necessarily the best method - you'd usually want to define the table a bit more carefully, taking care of column data types, etc. We'll explore that later.

Let's keep going - we'll start with a count (*), to ensure everything loaded.

c = con.cursor()  
c.execute("select count(*) from cms_prescriber")  
[result for result in c]
[(1102268,)]

Looks like all rows loaded. Next, let's run a simple select to get at the first 10 rows.

c.execute("select * from cms_prescriber limit 10")  
[print(result) for result in list(c)]
(1912175340, "&H'S)U", "&E'K:(A:I", 'NA', 'MD', 'M', 'I', '301 THE ALAMEDA UNIT 82', 'NA', 'SAN JUAN BAUTISTA', 95045.0, 7001.0, 'CA', 'US', 'General Surgery', 'S', 'E', 106, 106.0, 1091.13, 1094, 52.0, 'NA', 80.0, 80.0, 899.77, 862.0, 'NA', 37.0, '*', 0.0, 0.0, '#', 0.0, 0.0, 'NA', 0.0, 0.0, '*', 0.0, 0.0, '#', 0.0, 0.0, 'NA', 66.0, 757.93, 'NA', 40.0, 333.2, 16.0, 194.0, 225.0, 16.0, 15.09, 15.0, 424.71, 0.0, 'NA', 0.0, 0.0, 'NA', 0.0, 'NA', 0.0, 0.0, 'NA', 0.0, 66.0, 15.0, 0.0, 0.0, 0.0, 31.0, 21.0, 0.0, 0.0, 0.0, 25.0, 0.0, 0.0, 24.0, 28.0, 1.244)

(1417051921, 'A', 'N', 'D', 'PA-C', 'F', 'I', '522 HEATHER RIDGE', 'NA', 'CATOOSA', 74015.0, 0.0, 'OK', 'US', 'Physician Assistant', 'S', 'E', 330, 330.0, 10656.77, 2905, 195.0, 'NA', 230.0, 230.0, 5215.87, 1856.0, 'NA', 141.0, '#', 0.0, 0.0, 'NA', 282.0, 5449.35, '*', 0.0, 0.0, 'NA', 44.0, 1332.95, 'NA', 286.0, 9323.82, 'NA', 108.0, 5067.71, 'NA', 222.0, 5589.06, 24.0, 175.19, 73.0, 24.0, 7.27, 182.0, 4299.52, 153.0, 'NA', 15.0, 223.21, 'NA', 13.0, 'NA', 0.0, 0.0, 'NA', 0.0, 67.0, 54.0, 99.0, 0.0, 0.0, 127.0, 68.0, 166.0, 0.0, 0.0, 0.0, 0.0, 0.0, 149.0, 46.0, 0.8248)
...8 more rows not shown for readability

The above call used MapD's thrift serialization API. We can also use the Arrow-based select_ipcand select_ipc_gpu calls which create Arrow buffers in CPU or GPU shared memory. Here's an example that shows how to do a zero-copy read of a MapD query result set into a pandas dataframe.

Let's look for the top 10 zip codes that show the providers that submit the largest aggregate number of opioid prescription claims. We'll add a timer while we're at it.

%timeit df = con.select_ipc("select CAST(nppes_provider_zip5 as INT) as zipcode, \
sum(total_claim_count) as total_claims, \  
sum(opioid_claim_count) as opioid_claims from cms_prescriber \  
group by 1 order by opioid_claims desc limit 100")  
54.5 ms ± 2.05 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

This takes ~54-55ms on average, for a million records, running on a i7 2016 MacBook Pro (your times may vary).

Let's check the results themselves, using normal pandas dataframe operations. For comparison, let's run an equivalent operation on the original dataframe itself.

%timeit df_p = prescriber_df.groupby(['nppes_provider_zip5'])[['total_claim_count', 'opioid_claim_count']].sum()
53.4 ms ± 864 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

So, the MapD aggregation query took approximately the same wall clock time as the in-memory pandas local dataframe. The difference becomes even more apparent when dealing with more complex queries on much larger datasets that cannot be manipulated in pandas directly.

Let's quickly peek at the query results. It looks like zip code 72205 (Little Rock AR) and 76104 (Fort Worth, TX) are among the places where providers submitted the largest number of opioid prescription claims.

df.head()  

Wrapping up

So there you have it - an example of how pymapd can bridge nicely into the PyData workflow you already have. We're working hard on pushing this integration deeper so that the experience of using MapD from a notebook within a PyData data science workflow is as seamless as possible, with help from our friends at Anaconda.

Also, via GOAI and Apache Arrow, we're pushing further on integrating MapD with Machine Learning tools without the unnecessary overhead of data interchange. Thanks to NVIDIA, h2o.ai, and specifically Wes McKinney and the Arrow contributor community for their work on Arrow and pymapd as well.

Try out MapD

Liked what you saw? You can download the Jupyter notebook here. Let us know what you think, on our community forums, or on github. You can also download a fully featured community edition (that includes the open source MapD SQL engine, and our data exploration UI called Immerse) here.

Venkat Krishnamurthy

About the Author

Venkat is Senior Director, Product Management for MapD Core who's been on day 1 of Learn CUDA in 21 days for the last 4 months, and spends his weekends annoying his kids with dad jokes and attempting to play guitar or bass along with James Brown records.