Clients

Since PipelineDB is compatible with PostgreSQL 9.5, it doesn’t have its own client libraries. Instead, any client that works with PostgreSQL (or any SQL database for that matter) will work with PipelineDB.

Here you’ll find examples of a simple PipelineDB application written in a few different languages and clients. The application simply creates the CONTINUOUS VIEW:

CREATE CONTINUOUS VIEW continuous view AS
      SELECT x::integer, COUNT(*) FROM stream GROUP BY x;

The application then emits 100,000 events resulting in 10 unique groupings for the CONTINUOUS VIEW, and prints out the results.

Python

For this example in Python, you’ll need to have psycopg2 installed.

import psycopg2

conn = psycopg2.connect('dbname=test user=user host=localhost port=5432')
pipeline = conn.cursor()

create_cv = """
CREATE CONTINUOUS VIEW continuous_view AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x
"""
pipeline.execute(create_cv)
conn.commit()

rows = []

for n in range(100000):
    # 10 unique groupings
    x = n % 10
    rows.append({'x': x})

# Now write the rows to the stream
pipeline.executemany('INSERT INTO stream (x) VALUES (%(x)s)', rows)

# Now read the results
pipeline.execute('SELECT * FROM continuous_view')
rows = pipeline.fetchall()

for row in rows:
    x, count = row

    print x, count

pipeline.execute('DROP CONTINUOUS VIEW continuous_view')
pipeline.close()

Ruby

This example in Ruby uses the pg gem.

require 'pg'
pipeline = PGconn.connect("dbname='test' user='user' host='localhost' port=5432")

# This continuous view will perform 3 aggregations on page view traffic, grouped by url:
#
# total_count - count the number of total page views for each url
# uniques     - count the number of unique users for each url
# p99_latency - determine the 99th-percentile latency for each url

q = "
CREATE CONTINUOUS VIEW v AS
SELECT
  url::text,
  count(*) AS total_count,
  count(DISTINCT cookie::text) AS uniques,
  percentile_cont(0.99) WITHIN GROUP (ORDER BY latency::integer) AS p99_latency
FROM page_views GROUP BY url"

pipeline.exec(q)

for n in 1..10000 do
  # 10 unique urls
  url = '/some/url/%d' % (n % 10)

  # 1000 unique cookies
  cookie = '%032d' % (n % 1000)

  # latency uniformly distributed between 1 and 100
  latency = rand(101)

  # NOTE: it would be much faster to batch these into a single INSERT
  # statement, but for simplicity's sake let's do one at a time
  pipeline.exec(
  "INSERT INTO page_views (url, cookie, latency) VALUES ('%s', '%s', %d)"
        % [url, cookie, latency])
end

# The output of a continuous view can be queried like any other table or view
rows = pipeline.exec('SELECT * FROM v ORDER BY url')

rows.each do |row|
  puts row
end

# Clean up
pipeline.exec('DROP CONTINUOUS VIEW v')

Java

For this example you’ll need to have JDBC installed and on your CLASSPATH.

import java.util.Properties;
import java.sql.*;

public class Example {

  static final String HOST = "localhost";
  static final String DATABASE = "test";
  static final String USER = "user";

  public static void main(String[] args) throws SQLException {

    // Connect to "test" database on port 5432
    String url = "jdbc:postgresql://" + HOST + ":5432/" + DATABASE;
    ResultSet  rs;
    Properties props = new Properties();

    props.setProperty("user", USER);
    Connection conn = DriverManager.getConnection(url, props);

    Statement stmt = conn.createStatement();
    stmt.executeUpdate(
      "CREATE CONTINUOUS VIEW v AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x");

    for (int i=0; i<100000; i++)
    {
      // 10 unique groupings
      int x = i % 10;

      // INSERT INTO stream (x) VALUES (x)
      stmt.addBatch("INSERT INTO stream (x) VALUES (" + Integer.toString(x) + ")");
    }

    stmt.executeBatch();

    rs = stmt.executeQuery("SELECT * FROM v");
    while (rs.next())
    {
      int id = rs.getInt("x");
      int count = rs.getInt("count");

      System.out.println(id + " = " + count);
    }

    // Clean up
    stmt.executeUpdate("DROP CONTINUOUS VIEW v");
    conn.close();
  }
}