Engineering

Part 2: Business Intelligence Data Indexing from PostgreSQL using Algolia – Implementation and conclusion
facebooklinkedintwittermail

We invited our friends at Starschema to write about an example of using Algolia in combination with PostgreSQL. Enjoy this two-part series by Software Developer Mátyás Budavári!

If you would like to jump back to part one, where the use case and proposed solution are covered, click here.


In this post, we will walk through a simplified implementation of our search index solution.

To let our users access our audit data from an external location, we need to send it to a place where they can access it. So we’ll walk through the implementation details on how we achieve that.

To mimic anonymized data, we’re going to use randomly generated number ID-s, and we’re going to focus on the structure and omit JSON data type from the example.

You can check out the source code on GitHub. You can try it out with a free-tier Algolia account. For the Installation Steps, check out the README at the root of the GitHub repository.

Indexing Business Intelligence Data

Implementation Details

Let’s dive into our solution in detail. Our demo follows a simple pattern.

We create new audit log lines periodically with a single producer and read these new lines with multiple data consumers.

For the sake of simplicity, we’ve created a minimal example for the services.

  • The Producer puts random activity in the database
  • The Consumer reads the queue and uploads the data into Algolia

For easier readability and sticking to the point, I used only the important external libraries and kept the code as simple as possible.

This example does not reflect our actual codebase but gives you an idea of how it can be implemented with minimal overhead.

Getting started

Let’s go through what you need to get started as seen in the repository README.

All components and dependencies are described in the docker-compose.yml file.

  • To run the example as intended, you have to have Docker installed.
  • You need to create a .env file based on the .env.example file
  • You need to register in Algolia and set up your credentials in the .env file.
  • Note that the docker-compose file uses environment variables. It will only work as intended if it’s started from the same folder as where the .env file is located

The example can be started with:

docker-compose up --build

After the start, the following should happen.

  1. PostgreSQL container starts
  2. PostgreSQL container initializes the database with mock data
  3. After the PostgreSQL container is in a healthy stage, a single producer and 2 consumers start up
  4. The applications wait and do their job periodically as set in the DELAY_PRODUCER and DELAY_CONSUMER environment variables.

Component Details

Let’s walk through each component and how they interact with each other.

PostgreSQL Database

We run the official docker image of PostgreSQL.

Upon first starting it:

  1. Creates a DB folder (or optionally a volume) to persist the database between runs
  2. Runs SQL-s that are inside docker-entrypoint-initdb.d in order to initialize the database.

Init Scripts

The PostgreSQL docker image has a way to load initial data into the database upon the first creation of the volume. You can find more general info on how it works from the documentation on the PostgreSQL’s image’s DockerHub page under the ‘Initialization scripts’ section.

The init scripts run in alphabetical order. The first set of sql files (001_init_audit_log_table.sql and 002_queue_table.sql) create the tables used by the applications.

  1. Create the app schema.
  2. Create app.audit_log fact table
  3. Create app.queue_audit_log queue table

After the tables are ready, 003_queue_trigger.sql creates a trigger to catch the new data inserted into the app.audit_log table and replicate it to the app.queue_audit_log queue.

create or replace function audit_insert_trigger_fnc()
  returns trigger as $$
    begin
        insert into
            app.queue_audit_log (
             action
            ,user_id
            ,content_item_id
            ,create_date
            )
        values(
             new."action"
            ,new."user_id"
            ,new."content_item_id"
            ,new."create_date"
        );

        return new;
    end;
$$ language 'plpgsql';

create trigger audit_insert_trigger
  after insert on app.audit_log
  for each row
  execute procedure audit_insert_trigger_fnc();

When the structure is ready and the trigger is in place, the last script (004_generate_mock_data.sql) generates random data into the fact table. Its configurable parts are extracted into variables, so we can see how it behaves for a different amount of data. The randomizer has a hard-coded init seed, so it should generate the same data across multiple recreations.

-- set random seed for repeatable random data generation
SELECT setseed(0.8);
DO $$
    DECLARE
        -- configurable parameters for data generation
        nr_lines integer := 20;
        user_min integer := 10;
        user_max integer := 20;
        citm_min integer := 1500;
        citm_max integer := 2300;
        actn_min integer := 1;
        actn_max integer := 3;
    BEGIN
        with
            -- generate user_ids
            users as (
                select generate_series(user_min, user_max) as user_id
            )
            -- generate content_ids
           ,content as (
               select generate_series(citm_min, citm_max) as content_id
            )
            -- generate action_ids
           ,actions as (
               select generate_series(actn_min, actn_max) as action_id
            )
            -- get the cartesian product of the above in a random sort
           ,limited_data as (
               select
                 random() randomizer
                 ,*
               from users, content, actions
               order by randomizer
               limit nr_lines
            )
        insert
            into app.audit_log (
                action
                ,user_id
                ,content_item_id
            )
            select
                 action_id
                ,user_id
                ,content_id
            from limited_data
        ;
END $$
;

-- view data
-- select * from audit_log order by content_item_id, user_id, action;

This mock data generation script uses a controlled random data generation by setting the initial random seed at the start of the code with setseed. We generate a random() number for each generated line, and we can use this to avoid adding similar lines. We generate identifiers with generate_series between the configurable ranges for each value. To select only the given number of items, we add an upper bound of the resultset with limit.

To make the code better separated, the different logical components are defined in their own Common Table Expressions aka. CTE-s are created by with queries. In the limited_data CTE, we combine all generated lines for the different data types and shuffle them before limiting the results.

The official PostgreSQL docker image is written in a way that the database init scripts are only started if it’s the first start of the database. If you stop the services and then restart them, the initialization will not happen again, but the data will remain.

Producer

The code inside the ./producer folder represents our application. In our scenario, we don’t want to modify this code but leverage the power of Algolia through PostgreSQL.

  • Connects to the database on start.
  • Periodically generate a single new random log line into the fact table.
  • Out of this application’s scope, the insert trigger will copy this data into the queue.

This is a straightforward application. The main function is where most of the action happens. The db folder contains a PostgreSQL connector in db.go, and an insert statement in sql.go.

Consumer

The consumer services under the ./consumer folder read the last inserted lines from the database and put them into our Algolia index.

Connects to the database on start and then periodically reads the queue for new data and does the following in a transaction:

  1. Reads the last N lines and then marks them as visited
  2. Uploads the selected lines into Algolia
  3. Clears the visited lines from the queue

We assume that multiple consumers shall be available to adjust to heavy loads if necessary. We can not rely on the fact that these consumers are running at all times. Because of this constraint, we can not leverage the notify/listen pattern of PostgreSQL. We’re using a queue table instead.

The heart of this concept lies in the following SQL query:

with get_lines as (
  select
      id
    , action
    , user_id
    , content_item_id
    , create_date
    , _visited
  from app.queue_audit_log
  where _visited = false
  order by create_date desc
  limit $1
  for update skip locked -- add concurrent consumers
)
update
  app.queue_audit_log new
set _visited = true
from get_lines old
where old.id = new.id
returning
    new.id
  , new.action
  , new.user_id
  , new.content_item_id
  , new.create_date
;

Let’s separate what this query does into separate steps. All these steps are happening all at once in a single instruction inside a transaction started by our go code.

To let our queue table be accessed by multiple consumers, we need to add for update skip locked. The for update clause lets the engine know that the select subquery will be used for updating them. The skip locked part will ignore the lines that other consumers might have locked.

With SKIP LOCKED, any selected rows that cannot be immediately locked are skipped. Skipping locked rows provides an inconsistent view of the data, so this is not suitable for general-purpose work but can be used to avoid lock contention with multiple consumers accessing a queue-like table.

  • The order by create_date desc ensures that we get the latest available lines from the queue.
  • The limit $1 line ensures that we only select a subset of lines from the queue.
  • The returning declaration lets us get the data of each selected line.
  • In the update statement, we set the set _visited = true only from the currently unvisited lines by where _visited = false. It’s a safety measure. In theory, we shall never have _visited = true outside a transaction. This could be further simplified by deleting these lines inside the transaction.

Usage

With the data in Algolia, our analytics team can write custom queries to search for the data they’re interested in.

{
  "filters": "createDateTimestamp > 1655127131 AND userId=12 AND action=2"
}

It searches for all actions with an ID:2 by user:12 that were added after 1655127131 (Monday, June 13, 2022, 1:32:11 PM). The epochconverter is a handy tool to convert between timestamps and dates.

In the future, we plan to extend our web interface with custom selectors based on our analytics team’s needs. Algolia has an easy way to connect search queries with ready-made frontend components.

Conclusion

This simplified example captures the essence of our solution. We were able to fulfill our goals to stay close to PostgreSQL, implement the data consumers as separate services, and not let any log lines slip our queue.

With our implementation, Algolia helped us fulfill our business objectives to let us distribute our analytics into an external location outside our databases and improved our insight processing significantly.


We hope that you enjoyed this in-depth article from Mátyás, and if you are looking for more content like this, we have many more topics that we’ve covered on the Algolia Blog! If you’re new to Algolia, you can try it out by signing up for a free tier account.

About the authorMátyás Budavári

Mátyás Budavári

Software Developer Team Lead

Recommended Articles

Powered by Algolia AI Recommendations

Part 1: Business Intelligence Data Indexing from PostgreSQL using Algolia – Use Case and Proposed Solution
Engineering

Part 1: Business Intelligence Data Indexing from PostgreSQL using Algolia – Use Case and Proposed Solution

Mátyás Budavári

Mátyás Budavári

Software Developer Team Lead
How to responsibly give a chatbot access to a database
AI

How to responsibly give a chatbot access to a database

Jaden Baptista

Jaden Baptista

Freelance Writer at Authors Collective
Query optimize with 500x faster response times using a CQRS data store
Engineering

Query optimize with 500x faster response times using a CQRS data store

Daniel Parker

Daniel Parker

Senior Integrations Engineer @ GitLab