Processing large volumes of data safely and fast using Node.js and PostgreSQL

About designing a PostgreSQL client for Node.js

  • Promotes writing raw SQL.
  • Discourages ad-hoc dynamic generation of SQL.
  • Assertions and type safety
  • Safe connection handling.
  • Safe transaction handling.
  • Safe value interpolation 🦄.
  • Detail logging.
  • Asynchronous stack trace resolution.
  • Middlewares.

Battle-Tested 👨‍🚒

Slonik logs provide information about the query execution times, stack traces and value bindings.

Repeating code patterns and type safety

// @flow
import {
sql
} from 'slonik';
import type {
DatabaseConnectionType
} from 'slonik';
opaque type DatabaseRecordIdType = number;const getFooIdByBar = async (
connection: DatabaseConnectionType,
bar: string
): Promise<DatabaseRecordIdType> => {
const fooResult = await connection.query(sql`
SELECT id
FROM foo
WHERE bar = ${bar}
`);
if (fooResult.rowCount === 0) {
throw new Error('Resource not found.');
}
if (fooResult.rowCount > 1) {
throw new Error('Data integrity constraint violation.');
}
return fooResult[0].id;
};
const getFooIdByBar = (
connection: DatabaseConnectionType,
bar: string
): Promise<DatabaseRecordIdType> => {
return connection.oneFirst(sql`
SELECT id
FROM foo
WHERE bar = ${bar}
`);
};
  • NotFoundError if query returns no rows
  • DataIntegrityError if query returns multiple rows
  • DataIntegrityError if query returns multiple columns
const fooId = await connection.many(sql`
SELECT id
FROM foo
WHERE bar = ${bar}
`);
await connection.query(sql`
DELETE FROM baz
WHERE foo_id = ${fooId}
`);

Protecting against unsafe connection handling

// Note: This example is using unsupported API.const main = async () => {
const connection = await pool.connect();
await connection.query(sql`SELECT foo()`); await connection.release();
};
// Note: This example is using unsupported API.const main = async () => {
const connection = await pool.connect();
let lastExecutionResult; try {
lastExecutionResult = await connection.query(sql`SELECT foo()`);
} finally {
await connection.release();
}
return lastExecutionResult;
};
const main = () => {
return pool.connect((connection) => {
return connection.query(sql`SELECT foo()`);
});
};

Protecting against unsafe transaction handling

connection.transaction(async (transactionConnection) => {
await transactionConnection.query(sql`INSERT INTO foo (bar) VALUES ('baz')`);
await transactionConnection.query(sql`INSERT INTO qux (quux) VALUES ('quuz')`);
});

Protecting against unsafe value interpolation 🦄

// Note: This example is using unsupported API.connection.query('SELECT $1', [
userInput
]);
// Note: This example is using unsupported API.connection.query('SELECT \'' + userInput + '\'');
connection.query('SELECT 1');
connection.query(sql`SELECT 1`);
connection.query(sql`SELECT ${userInput}`);
connection.query(sql`
SELECT ${sql.identifier(['foo', 'a'])}
FROM (
VALUES ${sql.tupleList([['a1', 'b1', 'c1'], ['a2', 'b2', 'c2']])}
) foo(a, b, c)
WHERE foo.b IN (${sql.tuple(['c1', 'a2'])})
`);
SELECT "foo"."a"
FROM (
VALUES
($1, $2, $3),
($4, $5, $6)
) foo(a, b, c)
WHERE foo.b IN ($7, $8)
sql`
SELECT ${sql.raw('$1', ['foo'])}
`;

Interceptors

  • afterPoolConnection – Executed after a connection is acquired from the connection pool (or a new connection is created).
  • afterQueryExecutionafterQueryExecution must return the result of the query, which will be passed down to the client. Use afterQuery to modify the query result.
  • beforeQueryExecution – This function can optionally return a direct result of the query which will cause the actual query never to be executed.
  • beforeConnectionPoolRelease – Executed before connection is released back to the connection pool.
  • transformQuery – Executed before beforeQueryExecution. Transforms query.

Built-in interceptors

Inserting large number of rows

await connection.query(sql`
INSERT INTO (foo, bar, baz)
VALUES ${sql.tupleList([
[1, 2, 3],
[4, 5, 6]
])}
`);
{
sql: 'INSERT INTO (foo, bar, baz) VALUES ($1, $2, $3), ($4, $5, $6)',
values: [
1,
2,
3,
4,
5,
6
]
}
  1. The generated SQL is dynamic and will vary depending on the input. (You will not be able to track query stats. Query parsing time increases with the query size.)
  2. There is a maximum number of parameters that can be bound to the statement (65535).
await connection.query(sql`
INSERT INTO (foo, bar, baz)
SELECT *
FROM ${sql.unnest(
[
[1, 2, 3],
[4, 5, 6]
],
[
'integer',
'integer',
'integer'
]
)}
`);
{
sql: 'INSERT INTO (foo, bar, baz) SELECT * FROM unnest($1::integer[], $2::integer[], $2::integer[])',
values: [
[
1,
4
],
[
2,
5
],
[
3,
6
]
]
}

What is the next big feature for Slonik?

Time to install Slonik

Bonus: Origin of the name

Meet Slonik

If (you support my open-source work through Buy me a coffee or Patreon) {you will have my eternal gratitude 🙌}

--

--

Founder, engineer interested in JavaScript, PostgreSQL and DevOps. Follow me on Twitter for outbursts about startups & engineering. https://twitter.com/kuizinas

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Gajus Kuizinas

Founder, engineer interested in JavaScript, PostgreSQL and DevOps. Follow me on Twitter for outbursts about startups & engineering. https://twitter.com/kuizinas