A billion rows per second

Metaprogramming Python for Big Data


Ville Tuulos
Principal Engineer @ AdRoll

ville.tuulos@adroll.com

We faced the key technical challenge of modern Business Intelligence:

How to query tens of billions of events interactively?

Our solution, DeliRoll, is implemented in Python.

Everyone knows that Python is SLOW.

You can't handle big data with low latency in Python!

Small Benchmark

  • Data: 1.5 billion rows, 400 columns - 660GB.
  • Smaller execution time is better.
  • Fastest result chosen from 10 consecutive runs.
  • Queries are cached, results are not.

Setup


DeliRoll

Amazon Redshift

Instance typecr1.8xlargeRedshift 8XL
Instances13
Cores1632
RAM244GB240GB
Cost / Month$2,520$9,792

Aggregate One Column

SELECT sum(column) FROM table

Redshift

1.9s

DeliRoll

0.6s

Group by Dimension

SELECT sum(column) FROM table GROUP BY dimension

Redshift

21.6s

DeliRoll

17.4s

Aggregate 7 Columns

SELECT sum(col1),sum(col2),...,sum(col7) FROM table

Redshift

4.8s

DeliRoll

0.7s

Aggregate 250 Columns

SELECT sum(col1),sum(col2),...,sum(col250) FROM table

Redshift

...8 minutes!

DeliRoll

1.9s

Unpossible!

DeliRoll Architecture

Frontend (e.g. Tableau)
PostgreSQL
Multicorn
DeliRoll Server
DeliRoll Worker
Numba
LLVM
Densely-encoded data

Bad programmers worry about the code.

Good programmers worry about

data structures

and their relationships.


Linus Torvalds

Raw Data: Homogeneous Events


{"cookie_set":true,"country":"Sweden","cost":840}
{"cookie_set":false,"country":"UK","cost":23}
{"cookie_set":true,"country":"China","cost":11}

Remove Redundant Field Names


{"cookie_set":true,"country":"Sweden","cost":840}
{"cookie_set":false,"country":"UK","cost":23}
{"cookie_set":true,"country":"China","cost":11}

Encode Values in Binary


{"cookie_set": integer ,"country": integer ,"cost": integer }
{"cookie_set": integer ,"country": integer ,"cost": integer }
{"cookie_set": integer ,"country": integer ,"cost": integer }

Trick #0: Sparse Matrix

100% of the original size

Trick #1: Typed Columns

50% of the original size

Trick #2: Sort Dimensions

50% of the original size

Trick #3: Merge Dimensions

20% of the original size

Trick #4: Dense Encoding

15% of the original size

Why Compressed Data Structure?

Compression ratio dropped from 11:1 to 1.5:1

Less redundancy

The dense encoding results to faster computation

L1/L2 cache efficiency

The dense encoding is partially

self-indexing

Numba is a just-in-time specializing compiler which compiles annotated Python and NumPy code to LLVM.



from numba import autojit

@autojit
def sum2d(arr):
    M, N = arr.shape
    result = 0.0
    for i in range(M):
        for j in range(N):
            result += arr[i,j]
    return result

Python generates a Python function based on the query


with self.DEF([(m, types[m]) for m in args]):
    self.INIT_SPARSEVAR(score_metrics)
    with self.FOR_ALL_ROWS(has_row_filter):
        with self.ROW_MATCHES(flags, where_expr, score_metrics, partitioned):
            self.ASSIGN_METRICS(score_metrics)

which compiles to Numba-compatible low-level Python


while (_indptr_idx < _indptr_len):
    while (True):
        _byte = _indptr[_indptr_idx]
        if (_byte & 1):
            _indptr_row += (_byte >> 1) + 1
            _indptr_val |= (_byte >> 1) << _shift

which compiles to LLVM


  while_condition_20:15:
    %_shift_2 = phi i32
    %_indptr_idx_3 = phi i64
    while_body_21:12:
       %81 = mul i64 %_indptr_idx_3, %29
       %82 = add i64 0, %81
       %83 = getelementptr i8* %21, i64 %82

which compiles to optimized machine code, on the fly.

Multicorn allows you to access any data source in your PostgreSQL database in Python.

In Postgres, create a new Foreign Data Table using Multicorn


CREATE FOREIGN TABLE constanttable (
    test character varying, test2 character varying
) server multicorn_srv options (
    wrapper 'myfdw.ConstantForeignDataWrapper'
)
SELECT * from constanttable;
    

SQL query gets forwarded to our Python class by Postgres


class ConstantForeignDataWrapper(ForeignDataWrapper):
    def execute(self, quals, columns):
        for index in range(4):
            line = {}
            for column_name in self.columns:
                line[column_name] = '%s %s' % (column_name, index)
            yield line
    

Query is evaluated and results are returned back to Postgres


  test     |  test2
---------+----------
 test 0  | test2 0
 test 1  | test2 1
 test 2  | test2 2
 test 3  | test2 3
    

Cheating?


That's the whole point.


You can use a high-level language to quickly implement domain-specific solutions that outperform generic solutions, regardless of the language they use.

And, most importantly,

You dare to do things
that would be too painful to do
with a lower-level language.

Thank You!


is Hiring


Python, Erlang, Big Data, AWS


Fastest Growing Advertising Company Two Years in a Row