I have to adapt the rfm metrics logic to bank products, in this case deposits
The business logic explained to me is such:
recency - last date of a positive balance on any of the deposit accounts of a customer
frequency - culminative number of all deposit accounts of a customer
monetary - sum of max balances of all deposit accounts of a customer
the rules for new row inserts and updates:
a new date range start whenever a customer goes from zero balance to positive
lets say I have a customer who had positive balances from 2020-01-08 untill 2020-05-09
on 2020-05-09 all of its deposits balances became 0
he will have an open ended date range [2020-01-08) with recency 2020-05-09
this date range will only close and a new one will be inserted only when he once again comes back from 0
I have been at it for several days, but I just cannot comprehend how can I make it work...
Here is the code I have so far:
-- DROP PROCEDURE feature_store.rfm_deposits_load_inc(date);
CREATE OR REPLACE PROCEDURE feature_store.rfm_deposits_load_inc(IN load_date date)
LANGUAGE plpgsql
AS $procedure$
DECLARE
prev_load_date date := load_date - interval '1 day';
insert_cols text;
hash_expr text;
result_sql text;
BEGIN
-- Step 1: Rollback if already processed
if exists (
select 1
from feature_store.rfm_deposits
where date_range >> daterange(prev_load_date, load_date)
) then
raise notice 'Already calculated for this load_date, rolling back...';
delete from feature_store.rfm_deposits
where date_range >> daterange(prev_load_date, load_date);
update feature_store.rfm_deposits
set date_range = daterange(lower(date_range), null)
where prev_load_date <@ date_range and date_range << daterange('9999-12-31'::date, null);
raise notice 'Rollback complete';
end if;
DROP TABLE IF EXISTS temp_today_balances;
CREATE TEMP TABLE temp_today_balances AS
SELECT
d.customer_hub_id,
d.customer_id,
d.main_account_hub_id AS account_hub_id,
MAX(ab.sumn) AS balance_today
FROM l2.deposits d
JOIN l2.accounts_balances ab
ON ab.account_hub_id = d.main_account_hub_id
AND load_date >= lower(ab.date_range)
AND (upper(ab.date_range) IS NULL OR load_date < upper(ab.date_range))
INNER JOIN l1_dbo.hs_customer_onlinebankdb_active c
ON c.hub_id = d.customer_hub_id
WHERE d.open_date >= '2020-01-01'
AND c.customer_type_id = 1
GROUP BY d.customer_hub_id, d.customer_id, d.main_account_hub_id;
CREATE INDEX ON temp_today_balances(customer_hub_id);
ANALYZE temp_today_balances;
DROP TABLE IF EXISTS temp_customer_agg;
CREATE TEMP TABLE temp_customer_agg AS
SELECT
customer_hub_id,
MAX(customer_id) AS customer_id,
SUM(balance_today) AS total_balance,
COUNT(DISTINCT account_hub_id) AS total_accounts,
BOOL_OR(balance_today > 0) AS is_active
FROM temp_today_balances
GROUP BY customer_hub_id;
CREATE INDEX ON temp_customer_agg(customer_hub_id);
ANALYZE temp_customer_agg;
DROP TABLE IF EXISTS temp_prev_state;
CREATE TEMP TABLE temp_prev_state AS
SELECT
r.customer_hub_id,
r.customer_id,
r.deposits_recency,
r.deposits_frequency,
r.deposits_monetary,
r.last_max_balance,
lower(r.date_range) AS range_start,
upper(r.date_range) AS range_end
FROM feature_store.rfm_deposits r
WHERE prev_load_date >= lower(r.date_range)
AND (upper(r.date_range) IS NULL OR prev_load_date < upper(r.date_range));
CREATE INDEX ON temp_prev_state(customer_hub_id);
ANALYZE temp_prev_state;
DROP TABLE IF EXISTS temp_kv;
CREATE TEMP TABLE temp_kv AS
SELECT
COALESCE(p.customer_hub_id, t.customer_hub_id) AS customer_hub_id,
COALESCE(p.account_hub_id, t.account_hub_id) AS account_hub_id,
GREATEST(
COALESCE((p.value)::numeric, 0),
COALESCE(t.balance_today, 0)
) AS max_balance
FROM (
SELECT ps.customer_hub_id, kv.key::bigint AS account_hub_id, kv.value
FROM temp_prev_state ps
LEFT JOIN LATERAL jsonb_each_text(COALESCE(ps.last_max_balance, '{}'::jsonb)) kv ON true
) p
FULL JOIN temp_today_balances t
ON p.customer_hub_id = t.customer_hub_id
AND p.account_hub_id = t.account_hub_id;
DROP TABLE IF EXISTS temp_monetary;
CREATE TEMP TABLE temp_monetary AS
SELECT
customer_hub_id,
jsonb_object_agg(account_hub_id::text, max_balance) AS last_max_balance_new,
SUM(max_balance) AS deposits_monetary_new
FROM temp_kv
GROUP BY customer_hub_id;
CREATE INDEX ON temp_monetary(customer_hub_id);
ANALYZE temp_monetary;
DROP TABLE IF EXISTS temp_transitions;
CREATE TEMP TABLE temp_transitions AS
SELECT
COALESCE(a.customer_hub_id, p.customer_hub_id) AS customer_hub_id,
COALESCE(a.customer_id, p.customer_id) AS customer_id,
COALESCE(a.total_accounts, p.deposits_frequency, 0) AS deposits_frequency_new,
COALESCE(m.deposits_monetary_new, p.deposits_monetary, 0) AS deposits_monetary_new,
COALESCE(m.last_max_balance_new, p.last_max_balance, '{}'::jsonb) AS last_max_balance_new,
a.is_active AS active_today,
p.deposits_recency AS prev_recency,
CASE
WHEN COALESCE(a.is_active, FALSE) THEN load_date
ELSE p.deposits_recency
END AS deposits_recency_new
FROM temp_prev_state p
FULL JOIN temp_customer_agg a ON a.customer_hub_id = p.customer_hub_id
LEFT JOIN temp_monetary m ON m.customer_hub_id = COALESCE(a.customer_hub_id, p.customer_hub_id);
CREATE INDEX ON temp_transitions(customer_hub_id);
ANALYZE temp_transitions;
UPDATE feature_store.rfm_deposits t
SET deposits_recency = tr.deposits_recency_new,
deposits_monetary = tr.deposits_monetary_new,
deposits_frequency = tr.deposits_frequency_new,
last_max_balance = tr.last_max_balance_new,
last_updated_dwh = now()
FROM temp_transitions tr
WHERE prev_load_date >= lower(t.date_range)
AND (upper(t.date_range) IS NULL OR prev_load_date < upper(t.date_range))
AND t.customer_hub_id = tr.customer_hub_id;
DROP TABLE IF EXISTS temp_new_customers;
CREATE TEMP TABLE temp_new_customers AS
SELECT
a.customer_hub_id,
a.customer_id,
load_date AS deposits_recency,
a.total_accounts AS deposits_frequency,
m.deposits_monetary_new AS deposits_monetary,
m.last_max_balance_new AS last_max_balance,
daterange(load_date, NULL) AS date_range
FROM temp_customer_agg a
JOIN temp_monetary m USING (customer_hub_id)
LEFT JOIN feature_store.rfm_deposits r ON r.customer_hub_id = a.customer_hub_id
WHERE a.is_active = TRUE
AND r.customer_hub_id IS NULL;
INSERT INTO feature_store.rfm_deposits (
customer_hub_id,
customer_id,
deposits_recency,
deposits_frequency,
deposits_monetary,
date_range,
hash,
last_max_balance
)
SELECT
n.customer_hub_id,
n.customer_id,
n.deposits_recency,
n.deposits_frequency,
n.deposits_monetary,
n.date_range,
md5(
n.customer_hub_id::text || '^?$%' ||
n.customer_id::text || '^?$%' ||
n.deposits_recency::text || '^?$%' ||
n.deposits_frequency::text || '^?$%' ||
n.deposits_monetary::text
)::uuid AS hash,
n.last_max_balance
FROM temp_new_customers n;
DROP TABLE IF EXISTS temp_prev_balance;
CREATE TEMP TABLE temp_prev_balance AS
SELECT
p.customer_hub_id,
SUM((kv.value)::numeric) AS prev_total_balance
FROM temp_prev_state p
LEFT JOIN LATERAL jsonb_each_text(COALESCE(p.last_max_balance, '{}'::jsonb)) kv ON true
GROUP BY p.customer_hub_id;
DROP TABLE IF EXISTS temp_reactivated;
CREATE TEMP TABLE temp_reactivated AS
SELECT
a.customer_hub_id,
a.customer_id,
load_date AS deposits_recency,
a.total_accounts AS deposits_frequency,
m.deposits_monetary_new AS deposits_monetary,
m.last_max_balance_new AS last_max_balance,
daterange(load_date, NULL) AS date_range
FROM temp_customer_agg a
JOIN temp_monetary m USING (customer_hub_id)
JOIN temp_prev_balance pb USING (customer_hub_id)
WHERE a.is_active = TRUE
AND COALESCE(pb.prev_total_balance, 0) = 0; -- True Reactivation Only
UPDATE feature_store.rfm_deposits t
SET date_range = daterange(lower(t.date_range), load_date),
last_updated_dwh = now()
WHERE prev_load_date >= lower(t.date_range)
AND (upper(t.date_range) IS NULL OR prev_load_date < upper(t.date_range))
AND t.customer_hub_id IN (SELECT customer_hub_id FROM temp_reactivated);
SELECT string_agg(quote_ident(column_name), ', ')
INTO insert_cols
FROM information_schema.columns
WHERE table_schema = 'feature_store'
AND table_name = 'rfm_deposits'
AND column_name NOT IN ('created_dwh','last_updated_dwh');
SELECT string_agg(
'coalesce(' || quote_ident(column_name) || '::text, '''') || ''^?$%''',
' || '
)
INTO hash_expr
FROM information_schema.columns
WHERE table_schema = 'feature_store'
AND table_name = 'rfm_deposits'
AND column_name NOT IN ('customer_hub_id','customer_id','date_range','hash','created_dwh','last_updated_dwh');
result_sql := format($$DROP TABLE IF EXISTS temp_inserted;
CREATE TEMP TABLE temp_inserted AS
SELECT
r.*,
md5(%s)::uuid AS hash
FROM temp_reactivated r;$$, hash_expr);
EXECUTE result_sql;
EXECUTE format($$INSERT INTO feature_store.rfm_deposits (%s)
SELECT %s FROM temp_inserted;$$,
insert_cols, insert_cols);
ANALYZE feature_store.rfm_deposits;
RAISE NOTICE 'RFM Deposits incremental load complete for %', load_date;
END;
$procedure$
;
Appreciate any tips