Optimize Postgres Cursor Pagination

This query use to implement graphql cursor pagination (first, last, after, before, offset, order)



WITH __l1__ AS(
  SELECT
    json_build_array(
      json_build_array('id', 'alpha_3'),
      json_build_array(id, alpha_3)
    ) :: TEXT AS __cursor__,
    *
  FROM
    countries
  ORDER BY
    id ASC,
    alpha_3 ASC
  LIMIT
    10 OFFSET 0
),
__l2__ AS(
  SELECT
    EXISTS(
      SELECT
        true
      FROM
        countries
      WHERE
        json_build_array(
          json_build_array('id', 'alpha_3'),
          json_build_array(id, alpha_3)
        ) :: TEXT > (
          SELECT
            MAX(__cursor__)
          FROM
            __l1__
        )
    ) AS __has_prev_page__,
    EXISTS(
      SELECT
        true
      FROM
        countries
      WHERE
        json_build_array(
          json_build_array('id', 'alpha_3'),
          json_build_array(id, alpha_3)
        ) :: TEXT < (
          SELECT
            MIN(__cursor__)
          FROM
            __l1__
        )
    ) AS __has_next_page__
)
SELECT
  *
FROM
  __l1__,
  __l2__
ORDER BY
  ROW_NUMBER() OVER() DESC
Rust code:
query(after, before, first, last, |after, before, first, last| async move {
    let orders = orders.unwrap_or(vec![CountryOrder::IdAsc]);
    let is_desc = orders
        .first()
        .map(|order| OrderColumn::is_desc(order))
        .unwrap_or(false);
    let is_desc = if last.is_some() { is_desc } else { !is_desc };

    let mut filters = vec![];
    if let Some(keyword) = keyword {
        filters.push(format!(r#"id ~* '{k}' OR alpha_2 ~* '{k}' OR alpha_3 ~* '{k}' OR name ILIKE '%{k}%'"#, k = keyword));
    }

    let filters = if filters.len() > 0 {
        format!("WHERE {}", filters.iter().map(|filter| format!("({})", filter)).join(" AND "))
    } else { String::default() };

    let mut l1_orders = orders
        .iter()
        .map(|order| if is_desc {
            OrderColumn::query_string_flip(order)
        } else {
            OrderColumn::query_string(order)
        })
        .join(", ");

    let mut page_logic = ("<", ">");
    let mut page_fn = ("MIN", "MAX");
    let mut l3_order = "ASC";
    if is_desc {
        page_logic = (page_logic.1, page_logic.0);
        page_fn = (page_fn.1, page_fn.0);
        l3_order = "DESC";
    }

    let limit = first.unwrap_or_else(|| last.unwrap_or(1000));

    let q = format!(r#"WITH __l1__ AS(
        SELECT json_build_array(json_build_array({cursor_keys}), json_build_array({cursor_columns}))::TEXT AS __cursor__, *
        FROM {table_name}
        {filters}
        ORDER BY {l1_orders}
        LIMIT {limit}
        OFFSET {offset}
    ), __l2__ AS(
        SELECT EXISTS(
            SELECT true FROM {table_name} WHERE json_build_array(json_build_array({cursor_keys}), json_build_array({cursor_columns}))::TEXT {prev_page_logic} (
                SELECT {prev_page_fn}(__cursor__) FROM __l1__
            )
        ) AS __has_prev_page__,
        EXISTS(
            SELECT true FROM {table_name} WHERE json_build_array(json_build_array({cursor_keys}), json_build_array({cursor_columns}))::TEXT {next_page_logic} (
                SELECT {next_page_fn}(__cursor__) FROM __l1__
            )
        ) AS __has_next_page__
    )
    SELECT * FROM __l1__, __l2__ ORDER BY ROW_NUMBER() OVER() {l3_order}"#,
                    table_name = TABLE_NAME,
                    cursor_keys = orders.iter().map(|order| format!("'{}'", OrderColumn::column_name(order))).join(", "),
                    cursor_columns = orders.iter().map(|order| OrderColumn::column_name(order)).join(", "),
                    limit = limit,
                    filters = filters,
                    l1_orders = l1_orders,
                    offset = offset.unwrap_or(0),
                    prev_page_logic = page_logic.0,
                    prev_page_fn = page_fn.0,
                    next_page_logic = page_logic.1,
                    next_page_fn = page_fn.1,
                    l3_order = l3_order,
    );

    println!("{}", q);

    let mut has_prev_page = false;
    let mut has_next_page = false;
    let data = sqlx::query(q.as_str())
        .fetch(pool)
        .map_ok(|row| {
            has_prev_page = row.get("__has_prev_page__");
            has_next_page = row.get("__has_next_page__");
            let cursor: String = row.get("__cursor__");

            Edge::new(encode(cursor), Country {
                id: row.get("id"),
                alpha_2: row.get("alpha_2"),
                alpha_3: row.get("alpha_3"),
                name: row.get("name"),
            })
        })
        .map_err(Arc::new)
        .try_collect::<Vec<Edge<String, Country, EmptyFields>>>()
        .await?;

    let mut connection = Connection::new(has_prev_page, has_next_page);
    connection.append(data.into_iter());
    Ok::<_, Error>(connection)
}).await

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *