Оптимизация запросов оконных функций

В недавнем проекте мне нужно было создать функции, чтобы сравнить среднее время обработки данного сотрудника с общим средним временем обработки дел в очереди.

После недолгих размышлений и исследований я пришел к следующему запросу:

SELECT
 t.*,
 AVG(handle_time_secs) OVER (PARTITION BY queue_name ORDER BY UNIX_DATE(DATE(assigned_at)) RANGE BETWEEN 29 PRECEDING AND CURRENT ROW) AS queue_aht_30days,
 AVG(handle_time_secs) OVER (PARTITION BY emp_id, queue_name ORDER BY UNIX_DATE(DATE(assigned_at)) RANGE BETWEEN 29 PRECEDING AND CURRENT ROW) AS emp_queue_aht_30days
FROM project.dataset.table t

Однако, когда я попытался запустить его в BigQuery, я получил следующее исключение:

Resources exceeded during query execution: The query could not be executed in the allotted memory. Peak usage: 158% of limit. Top memory consumer(s): sort operations used for analytic OVER() clauses: 98% other/unattributed: 2%

Мой менеджер предложил мне уменьшить количество строк, которые оконная функция должна перебирать, сначала агрегируя данные по дням.

Следующий запрос сначала получает отметку времени, когда дело было впервые назначено сотруднику, а затем вычисляет общее и среднее время обработки обращений по дням.

WITH features_daily_emp_queue AS (
 SELECT
    queue_name,
    emp_id,
    DATE(assigned_at) AS assigned_at_date,
    avg(handle_time_secs) emp_queue_1day_aht,
    sum(handle_time_secs) emp_queue_1day_total_ht,
    COUNT(1) emp_queue_1day_cases
 FROM project.dataset.table
 GROUP BY
    queue_name,
    emp_id,
    assigned_at_date
)

Затем, вместо использования функции `AVG`, мы можем вычислить среднее значение, суммируя общее время обработки за каждый день и разделив его на сумму количества случаев, назначенных каждый день.

SELECT
 (SUM(emp_queue_1day_total_ht) OVER (PARTITION BY queue_name ORDER BY UNIX_DATE(assigned_at_date) RANGE BETWEEN 29 PRECEDING AND CURRENT ROW)) / (SUM(emp_queue_1day_cases) OVER (PARTITION BY queue_name ORDER BY UNIX_DATE(assigned_at_date) RANGE BETWEEN 29 PRECEDING AND CURRENT ROW)) AS queue_aht_30days
FROM features_daily_emp_queue

Мы можем повторить процесс; добавление идентификатора сотрудника в раздел.

SELECT
   (SUM(emp_queue_1day_total_ht) OVER (PARTITION BY emp_id, queue_name ORDER BY UNIX_DATE(assigned_at_date) RANGE BETWEEN 29 PRECEDING AND CURRENT ROW) /
              (SUM(emp_queue_1day_touches) OVER (PARTITION BY emp_id, queue_name ORDER BY UNIX_DATE(assigned_at_date) RANGE BETWEEN 29 PRECEDING AND CURRENT ROW)) AS emp_queue_aht_30days
FROM features_daily_emp_queue

Наконец, мы собираем все это вместе и присоединяем функции обратно к исходной таблице, используя идентификатор сотрудника и назначенную дату.

WITH features_daily_emp_queue AS (
       SELECT
           queue_name,
           emp_id,
           DATE(assigned_at) AS assigned_at_date,
           avg(handle_time_secs) emp_queue_1day_aht,
           sum(handle_time_secs) emp_queue_1day_total_ht,
           COUNT(1) emp_queue_1day_cases
       FROM project.dataset.table
       GROUP BY
           queue_name,
           emp_id,
           assigned_at_date
   ),
   features_daily_emp_queue_window AS (
       SELECT
           queue_name,
           emp_id,
           assigned_at_date,
           (SUM(emp_queue_1day_total_ht) OVER (PARTITION BY queue_name ORDER BY UNIX_DATE(assigned_at_date) RANGE BETWEEN 29 PRECEDING AND CURRENT ROW) /
              SUM(emp_queue_1day_cases) OVER (PARTITION BY queue_name ORDER BY UNIX_DATE(assigned_at_date) RANGE BETWEEN 29 PRECEDING AND CURRENT ROW)) AS queue_aht_30days,
               (SUM(emp_queue_1day_total_ht) OVER (PARTITION BY emp_id, queue_name ORDER BY UNIX_DATE(assigned_at_date) RANGE BETWEEN 29 PRECEDING AND CURRENT ROW) /
              SUM(emp_queue_1day_cases) OVER (PARTITION BY emp_id, queue_name ORDER BY UNIX_DATE(assigned_at_date) RANGE BETWEEN 29 PRECEDING AND CURRENT ROW)) AS emp_queue_aht_30days
   )
SELECT
   t.*,
   queue_aht_30days,
   emp_queue_aht_30days
FROM project.dataset.table t
   LEFT JOIN features_daily_emp_queue_window f
    ON t.emp_id = f.emp_id
     AND DATE(t.assigned_at) = f.assigned_at_date

В отличие от исходного запроса, предыдущий запрос выполнялся до завершения.