Lessons learned scaling PostgreSQL database to 1.2bn records/month

Choosing where to host the database, materialising data and using database as a job queue

Dataset

How we monitor data aggregation and detect anomalies is whole another topic. However, having PostgreSQL as the single source of truth about all data that is being aggregated and all the processes that aggregate the data made it a lot easier.

Choosing where to host the database

postgres=> SELECT version();                                                PostgreSQL 9.6.6 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 4.8.4-2ubuntu1~14.04.3) 4.8.4, 64-bit(1 row)

Update 2019–02–05:

Aiven.io have now rolled out the auto_explain support as well as the fixed Timescale 1.2 version as available maintenance updates.

Running out of the disk space due to an unattended replication slot that kept the WAL growing.

We’re looking into this issue and working with the timescale team, but response to most things isn’t immediate. Our help article at https://help.aiven.io/support/aiven-support-details describes the response times we provide.

Materializing data

CREATE MATERIALIZED VIEW venue_view AS
WITH
auditorium_with_future_events AS (
SELECT
e1.venue_id,
e1.auditorium_id
FROM event e1
WHERE
-- The 30 days interval ensures that we do not remove auditoriums
-- that are temporarily unavailable.
e1.start_time > now() - INTERVAL '30 day' AND
e1.auditorium_id IS NOT NULL
GROUP BY
e1.venue_id,
e1.auditorium_id
),
auditorium_with_future_events_count AS (
SELECT
awfe1.venue_id,
count(*) auditorium_count
FROM auditorium_with_future_events awfe1
GROUP BY
awfe1.venue_id
),
venue_auditorium_seat_count AS (
SELECT DISTINCT ON (e1.venue_id, e1.auditorium_id)
e1.venue_id,
e1.auditorium_id,
e1.seat_count
FROM auditorium_with_future_events awfe1
INNER JOIN event e1 ON e1.venue_id = awfe1.venue_id AND e1.auditorium_id = awfe1.auditorium_id
WHERE
e1.start_time > now() - INTERVAL '30 day' AND
e1.auditorium_id IS NOT NULL AND
e1.seat_count IS NOT NULL
ORDER BY
e1.venue_id,
e1.auditorium_id
),
venue_seat_count AS (
SELECT
vasc1.venue_id,
sum(vasc1.seat_count) seat_count
FROM venue_auditorium_seat_count vasc1
GROUP BY vasc1.venue_id
)
SELECT DISTINCT ON (v1.id)
v1.id,
v1.google_place_id,
v1.fuid,
v1.cinema_id,
v1.street_1,
v1.street_2,
v1.postcode,
v1.coordinates,
gp1.country_id,
gp1.timezone_id,
COALESCE(v1.phone_number, c1.phone_number) AS phone_number,
v1.display_name AS name,
COALESCE(v1.alternative_url, v1.url) AS url,
v1.permanently_closed_at,
awfec1.auditorium_count,
nearest_venue.id nearest_venue_id,
CASE
WHEN nearest_venue.id IS NULL
THEN NULL
ELSE round(ST_DistanceSphere(gp1.location, nearest_venue.location))
END nearest_venue_distance,
vsc1.seat_count seat_count
FROM venue v1
LEFT JOIN venue_seat_count vsc1 ON vsc1.venue_id = v1.id
LEFT JOIN google_place gp1 ON gp1.id = v1.google_place_id
LEFT JOIN LATERAL (
SELECT
v2.id,
gp2.location
FROM venue v2
INNER JOIN google_place gp2 ON gp2.id = v2.google_place_id
WHERE v2.id != v1.id
ORDER BY gp1.location <-> gp2.location
LIMIT 1
) nearest_venue ON TRUE
LEFT JOIN auditorium_with_future_events_count awfec1 ON awfec1.venue_id = v1.id
INNER JOIN cinema c1 ON c1.id = v1.cinema_id
WITH NO DATA;
CREATE UNIQUE INDEX ON venue_view (id);CREATE INDEX ON venue_view (google_place_id);
CREATE INDEX ON venue_view (cinema_id);
CREATE INDEX ON venue_view (country_id);
CREATE INDEX ON venue_view (nearest_venue_id);
CREATE MATERIALIZED VIEW auditorium_with_future_events_view
SELECT
e1.venue_id,
e1.auditorium_id
FROM event e1
WHERE
-- The 30 days interval ensures that we do not remove auditoriums
-- that are temporarily unavailable.
e1.start_time > now() - INTERVAL '30 day' AND
e1.auditorium_id IS NOT NULL
GROUP BY
e1.venue_id,
e1.auditorium_id
WITH NO DATA;
CREATE UNIQUE INDEX ON auditorium_with_future_events_view (venue_id, auditorium_id);CREATE MATERIALIZED VIEW venue_auditorium_seat_count_view
SELECT DISTINCT ON (e1.venue_id, e1.auditorium_id)
e1.venue_id,
e1.auditorium_id,
e1.seat_count
FROM auditorium_with_future_events_view awfe1
INNER JOIN event e1 ON e1.venue_id = awfe1.venue_id AND e1.auditorium_id = awfe1.auditorium_id
WHERE
e1.start_time > now() - INTERVAL '30 day' AND
e1.auditorium_id IS NOT NULL AND
e1.seat_count IS NOT NULL
ORDER BY
e1.venue_id,
e1.auditorium_id
WITH NO DATA;
CREATE UNIQUE INDEX ON venue_auditorium_seat_count_view (venue_id, auditorium_id);CREATE MATERIALIZED VIEW venue_view AS
WITH
auditorium_with_future_events_count AS (
SELECT
awfe1.venue_id,
count(*) auditorium_count
FROM auditorium_with_future_events_view awfe1
GROUP BY
awfe1.venue_id
),
venue_seat_count AS (
SELECT
vasc1.venue_id,
sum(vasc1.seat_count) seat_count
FROM venue_auditorium_seat_count_view vasc1
GROUP BY vasc1.venue_id
)
SELECT DISTINCT ON (v1.id)
v1.id,
v1.google_place_id,
v1.fuid,
v1.cinema_id,
v1.street_1,
v1.street_2,
v1.postcode,
v1.coordinates,
gp1.country_id,
gp1.timezone_id,
COALESCE(v1.phone_number, c1.phone_number) AS phone_number,
v1.display_name AS name,
COALESCE(v1.alternative_url, v1.url) AS url,
v1.permanently_closed_at,
awfec1.auditorium_count,
nearest_venue.id nearest_venue_id,
CASE
WHEN nearest_venue.id IS NULL
THEN NULL
ELSE round(ST_DistanceSphere(gp1.location, nearest_venue.location))
END nearest_venue_distance,
vsc1.seat_count seat_count
FROM venue v1
LEFT JOIN venue_seat_count vsc1 ON vsc1.venue_id = v1.id
LEFT JOIN google_place gp1 ON gp1.id = v1.google_place_id
LEFT JOIN LATERAL (
SELECT
v2.id,
gp2.location
FROM venue v2
INNER JOIN google_place gp2 ON gp2.id = v2.google_place_id
WHERE v2.id != v1.id
ORDER BY gp1.location <-> gp2.location
LIMIT 1
) nearest_venue ON TRUE
LEFT JOIN auditorium_with_future_events_count awfec1 ON awfec1.venue_id = v1.id
INNER JOIN cinema c1 ON c1.id = v1.cinema_id
WITH NO DATA;
CREATE UNIQUE INDEX ON venue_view (id);CREATE INDEX ON venue_view (google_place_id);
CREATE INDEX ON venue_view (cinema_id);
CREATE INDEX ON venue_view (country_id);
CREATE INDEX ON venue_view (nearest_venue_id);
CREATE TABLE materialized_view_refresh_schedule (
id SERIAL PRIMARY KEY,
materialized_view_name citext NOT NULL,
refresh_interval interval NOT NULL,
last_attempted_at timestamp with time zone,
maximum_execution_duration interval NOT NULL DEFAULT '00:30:00'::interval
);
CREATE UNIQUE INDEX materialized_view_refresh_schedule_materialized_view_name_idx ON materialized_view_refresh_schedule(materialized_view_name citext_ops);CREATE TABLE materialized_view_refresh_schedule_execution (
id integer DEFAULT nextval('materialized_view_refresh_id_seq'::regclass) PRIMARY KEY,
materialized_view_refresh_schedule_id integer NOT NULL REFERENCES materialized_view_refresh_schedule(id) ON DELETE CASCADE,
started_at timestamp with time zone NOT NULL,
ended_at timestamp with time zone,
execution_is_successful boolean,
error_name text,
error_message text,
terminated_at timestamp with time zone,
CONSTRAINT materialized_view_refresh_schedule_execution_check CHECK (terminated_at IS NULL OR ended_at IS NOT NULL)
);
CREATE INDEX materialized_view_refresh_schedule_execution_materialized_view_ ON materialized_view_refresh_schedule_execution(materialized_view_refresh_schedule_id int4_ops);
CREATE OR REPLACE FUNCTION schedule_new_materialized_view_refresh_schedule_execution()
RETURNS table(materialized_view_refresh_schedule_id int)
AS $$
BEGIN
RETURN QUERY
UPDATE materialized_view_refresh_schedule
SET last_attempted_at = now()
WHERE id IN (
SELECT mvrs1.id
FROM materialized_view_refresh_schedule mvrs1
LEFT JOIN LATERAL (
SELECT 1
FROM materialized_view_refresh_schedule_execution mvrse1
WHERE
mvrse1.ended_at IS NULL AND
mvrse1.materialized_view_refresh_schedule_id = mvrs1.id
) AS unendeded_materialized_view_refresh_schedule_execution ON TRUE
WHERE
unendeded_materialized_view_refresh_schedule_execution IS NULL AND
(
mvrs1.last_attempted_at IS NULL OR
mvrs1.last_attempted_at + mvrs1.refresh_interval < now()
)
ORDER BY mvrs1.last_attempted_at ASC NULLS FIRST
LIMIT 1
FOR UPDATE OF mvrs1 SKIP LOCKED
)
RETURNING id;
END
$$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION materialize_event_seat_state_change()
RETURNS void
AS $$
BEGIN
WITH
event_seat_state_count AS (
SELECT
essc1.id,
count(*)::smallint seat_count,
count(*) FILTER (WHERE ss1.nid = 'BLOCKED')::smallint seat_blocked_count,
count(*) FILTER (WHERE ss1.nid = 'BROKEN')::smallint seat_broken_count,
count(*) FILTER (WHERE ss1.nid = 'EMPTY')::smallint seat_empty_count,
count(*) FILTER (WHERE ss1.nid = 'HOUSE')::smallint seat_house_count,
count(*) FILTER (WHERE ss1.nid = 'SOLD')::smallint seat_sold_count,
count(*) FILTER (WHERE ss1.nid = 'UNKNOWN')::smallint seat_unknown_count,
count(*) FILTER (WHERE ss1.id IS NULL)::smallint seat_unmapped_count,
count(*) FILTER (WHERE ss1.nid IN ('BLOCKED', 'BROKEN', 'HOUSE', 'SOLD', 'UNKNOWN')) seat_unavailable_count
FROM event e1
LEFT JOIN event_seat_state_change essc1 ON essc1.event_id = e1.id
LEFT JOIN event_seat_state_change_seat_state esscss1 ON esscss1.event_seat_state_change_id = essc1.id
LEFT JOIN cinema_foreign_seat_state fcss1 ON fcss1.id = cinema_foreign_seat_state_id
LEFT JOIN seat_state ss1 ON ss1.id = fcss1.seat_state_id
WHERE
essc1.id IN (
SELECT id
FROM event_seat_state_change
WHERE
materialized_at IS NULL
ORDER BY materialized_at DESC
LIMIT 100
)
GROUP BY essc1.id
)
UPDATE event_seat_state_change essc1
SET
materialized_at = now(),
seat_count = essc2.seat_count,
seat_blocked_count = essc2.seat_blocked_count,
seat_broken_count = essc2.seat_broken_count,
seat_empty_count = essc2.seat_empty_count,
seat_house_count = essc2.seat_house_count,
seat_sold_count = essc2.seat_sold_count,
seat_unknown_count = essc2.seat_unknown_count,
seat_unmapped_count = essc2.seat_unmapped_count
FROM event_seat_state_count essc2
WHERE
essc1.id = essc2.id;
END
$$
LANGUAGE plpgsql
SET work_mem='1GB'
SET max_parallel_workers_per_gather=4;

Using database as a job queue

Building a simple, reliable and efficient concurrent work queues using PostgreSQL.

Each transaction scans the table and skips over locked rows, so with high numbers of active workers it can land up doing a bit of work to acquire a new item. It’s not just popping items off a stack. The query will probably have to walk an index with an index scan, fetching each candidate item from the heap and checking the lock status. With any reasonable queue this will all be in memory but it’s still a fair bit of churn.

CREATE OR REPLACE FUNCTION schedule_cinema_data_task()
RETURNS table(cinema_data_task_id int)
AS $$
DECLARE
scheduled_cinema_data_task_id int;
BEGIN
UPDATE
cinema_data_task_queue
SET
attempted_at = now()
WHERE
id = (
SELECT cdtq1.id
FROM cinema_data_task_queue cdtq1
WHERE cdtq1.attempted_at IS NULL
ORDER BY cdtq1.id ASC
LIMIT 1
FOR UPDATE OF cdtq1 SKIP LOCKED
)
RETURNING cinema_data_task_queue.cinema_data_task_id
INTO scheduled_cinema_data_task_id;
UPDATE cinema_data_task
SET last_attempted_at = now()
WHERE id = scheduled_cinema_data_task_id;
RETURN QUERY SELECT scheduled_cinema_data_task_id;
END
$$
LANGUAGE plpgsql
SET work_mem='100MB';
CREATE OR REPLACE FUNCTION update_cinema_data_task_queue()
RETURNS void
AS $$
DECLARE
outstanding_task_count int;
BEGIN
SELECT count(*)
FROM cinema_data_task_queue
WHERE attempted_at IS NULL
INTO outstanding_task_count;
IF outstanding_task_count < 100 THEN
INSERT INTO cinema_data_task_queue (cinema_data_task_id)
SELECT
cdtq1.cinema_data_task_id
FROM cinema_data_task_queue(100, 50, 100, false) cdtq1
WHERE
NOT EXISTS (
SELECT 1
FROM cinema_data_task_queue
WHERE
cinema_data_task_id = cdtq1.cinema_data_task_id AND
attempted_at IS NULL
)
ON CONFLICT (cinema_data_task_id) WHERE attempted_at IS NULL
DO NOTHING;
END IF;
END
$$
LANGUAGE plpgsql
SET work_mem='50MB';

Miscs

Identifying latency between different cloud providers.

Bonus: Slonik PostgreSQL client

Acknowledgements

If (you support my open-source work through Buy me a coffee or Patreon) {you will have my eternal gratitude 🙌}

--

--

Founder, engineer interested in JavaScript, PostgreSQL and DevOps. Follow me on Twitter for outbursts about startups & engineering. https://twitter.com/kuizinas

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Gajus Kuizinas

Founder, engineer interested in JavaScript, PostgreSQL and DevOps. Follow me on Twitter for outbursts about startups & engineering. https://twitter.com/kuizinas