Условие
Дан кликстрим events с полями:
user_id,medium— маркетинговый канал,event_name—page_view/purchase,page_location,transaction_id(только дляpurchase),to_timestamp— время события.
Разбейте действия пользователей на сессии. Триггер разрыва сессии:
- Появление нового
medium(маркетинговый канал сменился). - Совершение транзакции.
- Неактивность > 2 часов.
Для сессий, начавшихся не из-за смены канала, medium = 'unknown'.
Результат — таблица с теми же столбцами + сгенерированный session_id.
Решение
Подход
Это классическая multi-trigger sessionization. Идея:
- Для каждого события вычисляем флаг «новая сессия».
- Кумулятивная сумма флагов внутри
user_id→session_id. - Дополнительно считаем «эффективный»
mediumдля каждой сессии (если триггер былmedium-смена— наследуем новый medium; иначе'unknown').
Реализация
WITH events_ord AS (
SELECT
user_id,
medium,
event_name,
page_location,
transaction_id,
to_timestamp,
LAG(to_timestamp) OVER (PARTITION BY user_id ORDER BY to_timestamp) AS prev_ts,
LAG(medium) OVER (PARTITION BY user_id ORDER BY to_timestamp) AS prev_medium,
LAG(event_name) OVER (PARTITION BY user_id ORDER BY to_timestamp) AS prev_event
FROM events
),
flagged AS (
SELECT
*,
-- 1) первое событие юзера = новая сессия
-- 2) предыдущее = purchase → новая сессия (так как purchase закрывает)
-- 3) разрыв > 2 часа
-- 4) сменился medium
CASE
WHEN prev_ts IS NULL THEN 1
WHEN prev_event = 'purchase' THEN 1
WHEN EXTRACT(EPOCH FROM (to_timestamp - prev_ts)) > 7200 THEN 1
WHEN medium IS DISTINCT FROM prev_medium THEN 1
ELSE 0
END AS is_new_session,
-- Тип триггера: нужен, чтобы потом понять, наследовать medium или ставить 'unknown'
CASE
WHEN prev_ts IS NULL THEN 'first'
WHEN medium IS DISTINCT FROM prev_medium THEN 'medium_change'
WHEN prev_event = 'purchase' THEN 'after_purchase'
WHEN EXTRACT(EPOCH FROM (to_timestamp - prev_ts)) > 7200 THEN 'inactivity'
ELSE 'continuation'
END AS trigger_type
FROM events_ord
),
sessioned AS (
SELECT
*,
SUM(is_new_session) OVER (PARTITION BY user_id ORDER BY to_timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS session_num
FROM flagged
),
session_starts AS (
SELECT
user_id, session_num,
MIN(trigger_type) FILTER (WHERE is_new_session = 1) AS start_trigger,
-- medium на старте сессии: оригинальный medium первого события
(ARRAY_AGG(medium ORDER BY to_timestamp))[1] AS first_medium
FROM sessioned
GROUP BY user_id, session_num
),
labeled AS (
SELECT
s.user_id,
s.event_name,
s.page_location,
s.transaction_id,
s.to_timestamp,
-- session_id уникальный
CONCAT(s.user_id, '_', s.session_num) AS session_id,
-- medium сессии: новый только если триггер = смена medium
CASE
WHEN ss.start_trigger = 'medium_change' THEN ss.first_medium
WHEN ss.start_trigger = 'first' THEN ss.first_medium -- для первой сессии берём оригинальный
ELSE 'unknown'
END AS session_medium
FROM sessioned s
JOIN session_starts ss USING (user_id, session_num)
)
SELECT * FROM labeled
ORDER BY user_id, to_timestamp;Реализация — Python (если SQL негибок)
import pandas as pd
df = pd.read_csv("clickstream.csv", parse_dates=["to_timestamp"])
df = df.sort_values(["user_id", "to_timestamp"]).reset_index(drop=True)
g = df.groupby("user_id")
df["prev_ts"] = g["to_timestamp"].shift(1)
df["prev_medium"] = g["medium"].shift(1)
df["prev_event"] = g["event_name"].shift(1)
gap = (df["to_timestamp"] - df["prev_ts"]).dt.total_seconds()
cond_first = df["prev_ts"].isna()
cond_purchase = df["prev_event"] == "purchase"
cond_inactive = gap > 7200
cond_medium = df["medium"] != df["prev_medium"]
df["trigger_type"] = "continuation"
df.loc[cond_first, "trigger_type"] = "first"
df.loc[cond_medium & ~cond_first, "trigger_type"] = "medium_change"
df.loc[cond_purchase & ~cond_first & ~cond_medium, "trigger_type"] = "after_purchase"
df.loc[cond_inactive & ~cond_first & ~cond_medium & ~cond_purchase, "trigger_type"] = "inactivity"
df["is_new_session"] = (df["trigger_type"] != "continuation").astype(int)
df["session_num"] = df.groupby("user_id")["is_new_session"].cumsum()
df["session_id"] = df["user_id"].astype(str) + "_" + df["session_num"].astype(str)
# Эффективный medium сессии
session_meta = (df.groupby(["user_id", "session_num"])
.agg(start_trigger=("trigger_type", "first"),
first_medium=("medium", "first"))
.reset_index())
session_meta["session_medium"] = session_meta.apply(
lambda r: r["first_medium"]
if r["start_trigger"] in ("medium_change", "first")
else "unknown",
axis=1
)
df = df.merge(session_meta[["user_id", "session_num", "session_medium"]],
on=["user_id", "session_num"])Анализ / интерпретация
Получаем атрибуцию сессии маркетинговому каналу:
- Если юзер пришёл с
email, сессия = email. - Если он же дальше внутри одного захода что-то делает (без новых параметров) — сессия не «email», а 'unknown' (по требованию задачи).
- Если в новой сессии не было utm-сигнала (пришёл напрямую) — 'unknown'.
Это last-touch attribution с ограничением: атрибутируется только сессия, в которой пришёл utm. Это похоже на «сессию по GA», где medium фиксируется при заходе с UTM, а потом sessions без UTM считаются (direct) / (none).
Подводные камни
- Порядок проверки триггеров. Если событие —
purchaseс одновременно сменойmedium, что приоритетнее? Нужно зафиксировать порядок (см.CASE WHEN ... ELSIF ...). IS DISTINCT FROMvs<>. NULL не равен NULL. Если у первого событияmedium = NULL, а потом сноваNULL— обычное<>не сработает.IS DISTINCT FROMкорректен.prev_event = 'purchase'означает «после транзакции». Это после транзакции, не до. То есть транзакция «закрывает» текущую сессию, а следующее событие открывает новую.unknownдля смены medium. Если в новой сессииmedium = NULL, нужно ли тоже'unknown'? Условие задачи говорит «если разрыв НЕ по смене канала, medium = unknown». Если новая сессия началась из-за смены, но новый medium = NULL — формально это смена. Уточняйте.- Окно
2 часа. EXTRACT(EPOCH ...) > 7200 — секунды. Не путать с минутами. - Выборка с
LAG. Если события юзера приходят из разных систем с разной точностью времени, могут совпадатьto_timestamp. Tie-breaker (например,event_id) нужен для стабильногоLAG. session_idуникальность.CONCAT(user_id, '_', session_num)— уникально в рамках всех данных, но не глобально (другой user может получить такую же session_num). Если глобальная уникальность нужна —MD5(user_id || '_' || session_num || '_' || min_ts).- Пользователи без событий между сессиями. Не появляются в выходной таблице — это ожидаемо, но в отчётности учтите.
Эталонный ответ
Структура:
LAG(ts, medium, event)для сравнения с предыдущим событием.- Триггер «новая сессия» = first OR
prev_event = 'purchase'ORgap > 2hORmedium changed. - Кумулятивная сумма триггеров →
session_num. - Логика
session_medium: если триггер =medium_changeилиfirst, наследуемmediumпервого события; иначе'unknown'.
Главные тонкости — порядок триггеров, обработка NULL в medium через IS DISTINCT FROM, правильное окно с UNBOUNDED PRECEDING.