How can I fix the opening/closing of date ranges
06:43 26 Nov 2025

I have to adapt the rfm metrics logic to bank products, in this case deposits
The business logic explained to me is such:

recency - last date of a positive balance on any of the deposit accounts of a customer

frequency - culminative number of all deposit accounts of a customer

monetary - sum of max balances of all deposit accounts of a customer

the rules for new row inserts and updates:
a new date range start whenever a customer goes from zero balance to positive
lets say I have a customer who had positive balances from 2020-01-08 untill 2020-05-09

on 2020-05-09 all of its deposits balances became 0

he will have an open ended date range [2020-01-08) with recency 2020-05-09
this date range will only close and a new one will be inserted only when he once again comes back from 0

I have been at it for several days, but I just cannot comprehend how can I make it work...

Here is the code I have so far:

-- DROP PROCEDURE feature_store.rfm_deposits_load_inc(date);

CREATE OR REPLACE PROCEDURE feature_store.rfm_deposits_load_inc(IN load_date date)
 LANGUAGE plpgsql
AS $procedure$
DECLARE
    prev_load_date date := load_date - interval '1 day';
    insert_cols text;
    hash_expr text;
    result_sql text;
BEGIN
    -- Step 1: Rollback if already processed
    if exists (
        select 1
        from feature_store.rfm_deposits
        where date_range >> daterange(prev_load_date, load_date)
    ) then
raise notice 'Already calculated for this load_date, rolling back...';

        delete from feature_store.rfm_deposits
        where date_range >> daterange(prev_load_date, load_date);

        update feature_store.rfm_deposits
        set date_range = daterange(lower(date_range), null)
        where prev_load_date <@ date_range and date_range << daterange('9999-12-31'::date, null);

        raise notice 'Rollback complete';
    end if;

    DROP TABLE IF EXISTS temp_today_balances;
    CREATE TEMP TABLE temp_today_balances AS
    SELECT
        d.customer_hub_id,
        d.customer_id,
        d.main_account_hub_id AS account_hub_id,
        MAX(ab.sumn) AS balance_today
    FROM l2.deposits d
    JOIN l2.accounts_balances ab
      ON ab.account_hub_id = d.main_account_hub_id
     AND load_date >= lower(ab.date_range)
     AND (upper(ab.date_range) IS NULL OR load_date < upper(ab.date_range))
    INNER JOIN l1_dbo.hs_customer_onlinebankdb_active c 
      ON c.hub_id = d.customer_hub_id
    WHERE d.open_date >= '2020-01-01' 
      AND c.customer_type_id = 1  
    GROUP BY d.customer_hub_id, d.customer_id, d.main_account_hub_id;

    CREATE INDEX ON temp_today_balances(customer_hub_id);
    ANALYZE temp_today_balances;


    DROP TABLE IF EXISTS temp_customer_agg;
    CREATE TEMP TABLE temp_customer_agg AS
    SELECT
        customer_hub_id,
        MAX(customer_id) AS customer_id,
        SUM(balance_today) AS total_balance,
        COUNT(DISTINCT account_hub_id) AS total_accounts,
        BOOL_OR(balance_today > 0) AS is_active
    FROM temp_today_balances
    GROUP BY customer_hub_id;

    CREATE INDEX ON temp_customer_agg(customer_hub_id);
    ANALYZE temp_customer_agg;

    DROP TABLE IF EXISTS temp_prev_state;
    CREATE TEMP TABLE temp_prev_state AS
    SELECT
        r.customer_hub_id,
        r.customer_id,
        r.deposits_recency,
        r.deposits_frequency,
        r.deposits_monetary,
        r.last_max_balance,
        lower(r.date_range) AS range_start,
        upper(r.date_range) AS range_end
    FROM feature_store.rfm_deposits r
    WHERE prev_load_date >= lower(r.date_range)
      AND (upper(r.date_range) IS NULL OR prev_load_date < upper(r.date_range));

    CREATE INDEX ON temp_prev_state(customer_hub_id);
    ANALYZE temp_prev_state;

    DROP TABLE IF EXISTS temp_kv;
    CREATE TEMP TABLE temp_kv AS
    SELECT
        COALESCE(p.customer_hub_id, t.customer_hub_id) AS customer_hub_id,
        COALESCE(p.account_hub_id, t.account_hub_id) AS account_hub_id,
        GREATEST(
            COALESCE((p.value)::numeric, 0),
            COALESCE(t.balance_today, 0)
        ) AS max_balance
    FROM (
        SELECT ps.customer_hub_id, kv.key::bigint AS account_hub_id, kv.value
        FROM temp_prev_state ps
        LEFT JOIN LATERAL jsonb_each_text(COALESCE(ps.last_max_balance, '{}'::jsonb)) kv ON true
    ) p
    FULL JOIN temp_today_balances t
      ON p.customer_hub_id = t.customer_hub_id
     AND p.account_hub_id = t.account_hub_id;

    DROP TABLE IF EXISTS temp_monetary;
    CREATE TEMP TABLE temp_monetary AS
    SELECT
        customer_hub_id,
        jsonb_object_agg(account_hub_id::text, max_balance) AS last_max_balance_new,
        SUM(max_balance) AS deposits_monetary_new
    FROM temp_kv
    GROUP BY customer_hub_id;

    CREATE INDEX ON temp_monetary(customer_hub_id);
    ANALYZE temp_monetary;

    DROP TABLE IF EXISTS temp_transitions;
    CREATE TEMP TABLE temp_transitions AS
    SELECT
        COALESCE(a.customer_hub_id, p.customer_hub_id) AS customer_hub_id,
        COALESCE(a.customer_id, p.customer_id) AS customer_id,
        COALESCE(a.total_accounts, p.deposits_frequency, 0) AS deposits_frequency_new,
        COALESCE(m.deposits_monetary_new, p.deposits_monetary, 0) AS deposits_monetary_new,
        COALESCE(m.last_max_balance_new, p.last_max_balance, '{}'::jsonb) AS last_max_balance_new,
        a.is_active AS active_today,
        p.deposits_recency AS prev_recency,
        CASE
          WHEN COALESCE(a.is_active, FALSE) THEN load_date
          ELSE p.deposits_recency
        END AS deposits_recency_new
    FROM temp_prev_state p
    FULL JOIN temp_customer_agg a ON a.customer_hub_id = p.customer_hub_id
    LEFT JOIN temp_monetary m ON m.customer_hub_id = COALESCE(a.customer_hub_id, p.customer_hub_id);

    CREATE INDEX ON temp_transitions(customer_hub_id);
    ANALYZE temp_transitions;

    UPDATE feature_store.rfm_deposits t
    SET deposits_recency = tr.deposits_recency_new,
        deposits_monetary = tr.deposits_monetary_new,
        deposits_frequency = tr.deposits_frequency_new,
        last_max_balance = tr.last_max_balance_new,
        last_updated_dwh = now()
    FROM temp_transitions tr
    WHERE prev_load_date >= lower(t.date_range)
      AND (upper(t.date_range) IS NULL OR prev_load_date < upper(t.date_range))
      AND t.customer_hub_id = tr.customer_hub_id;

    DROP TABLE IF EXISTS temp_new_customers;
    CREATE TEMP TABLE temp_new_customers AS
    SELECT
        a.customer_hub_id,
        a.customer_id,
        load_date AS deposits_recency,
        a.total_accounts AS deposits_frequency,
        m.deposits_monetary_new AS deposits_monetary,
        m.last_max_balance_new AS last_max_balance,
        daterange(load_date, NULL) AS date_range
    FROM temp_customer_agg a
    JOIN temp_monetary m USING (customer_hub_id)
    LEFT JOIN feature_store.rfm_deposits r ON r.customer_hub_id = a.customer_hub_id
    WHERE a.is_active = TRUE
      AND r.customer_hub_id IS NULL;

    INSERT INTO feature_store.rfm_deposits (
        customer_hub_id,
        customer_id,
        deposits_recency,
        deposits_frequency,
        deposits_monetary,
        date_range,
        hash,
        last_max_balance
    )
    SELECT
        n.customer_hub_id,
        n.customer_id,
        n.deposits_recency,
        n.deposits_frequency,
        n.deposits_monetary,
        n.date_range,
        md5(
          n.customer_hub_id::text || '^?$%' ||
          n.customer_id::text || '^?$%' ||
          n.deposits_recency::text || '^?$%' ||
          n.deposits_frequency::text || '^?$%' ||
          n.deposits_monetary::text
        )::uuid AS hash,
        n.last_max_balance
    FROM temp_new_customers n;

    DROP TABLE IF EXISTS temp_prev_balance;
    CREATE TEMP TABLE temp_prev_balance AS
    SELECT
        p.customer_hub_id,
        SUM((kv.value)::numeric) AS prev_total_balance
    FROM temp_prev_state p
    LEFT JOIN LATERAL jsonb_each_text(COALESCE(p.last_max_balance, '{}'::jsonb)) kv ON true
    GROUP BY p.customer_hub_id;

    DROP TABLE IF EXISTS temp_reactivated;
    CREATE TEMP TABLE temp_reactivated AS
    SELECT
        a.customer_hub_id,
        a.customer_id,
        load_date AS deposits_recency,
        a.total_accounts AS deposits_frequency,
        m.deposits_monetary_new AS deposits_monetary,
        m.last_max_balance_new AS last_max_balance,
        daterange(load_date, NULL) AS date_range
    FROM temp_customer_agg a
    JOIN temp_monetary m USING (customer_hub_id)
    JOIN temp_prev_balance pb USING (customer_hub_id)
    WHERE a.is_active = TRUE
      AND COALESCE(pb.prev_total_balance, 0) = 0;  -- True Reactivation Only

    UPDATE feature_store.rfm_deposits t
    SET date_range = daterange(lower(t.date_range), load_date),
        last_updated_dwh = now()
    WHERE prev_load_date >= lower(t.date_range)
      AND (upper(t.date_range) IS NULL OR prev_load_date < upper(t.date_range))
      AND t.customer_hub_id IN (SELECT customer_hub_id FROM temp_reactivated);

    SELECT string_agg(quote_ident(column_name), ', ')
      INTO insert_cols
    FROM information_schema.columns
    WHERE table_schema = 'feature_store'
      AND table_name = 'rfm_deposits'
      AND column_name NOT IN ('created_dwh','last_updated_dwh');

    SELECT string_agg(
             'coalesce(' || quote_ident(column_name) || '::text, '''') || ''^?$%''',
             ' || '
           )
      INTO hash_expr
    FROM information_schema.columns
    WHERE table_schema = 'feature_store'
      AND table_name = 'rfm_deposits'
      AND column_name NOT IN ('customer_hub_id','customer_id','date_range','hash','created_dwh','last_updated_dwh');

    result_sql := format($$DROP TABLE IF EXISTS temp_inserted;
        CREATE TEMP TABLE temp_inserted AS
        SELECT
            r.*,
            md5(%s)::uuid AS hash
        FROM temp_reactivated r;$$, hash_expr);
    EXECUTE result_sql;

    EXECUTE format($$INSERT INTO feature_store.rfm_deposits (%s)
                    SELECT %s FROM temp_inserted;$$,
                    insert_cols, insert_cols);

    ANALYZE feature_store.rfm_deposits;

    RAISE NOTICE 'RFM Deposits incremental load complete for %', load_date;
END;
$procedure$
;

Appreciate any tips

sql postgresql incremental