6.5. Projects Query Jobs Elapsed Time Analysis

Below is an example of a query to evaluate BigQuery performance project-by-project in terms of the elapsed time of recent queries and matching prior jobs. For more details on this query, see the discussion after the query.

Search in the query for TODO to find lines where common changes to the query can be made. Script variables can be modified as needed. The common table expressions recent_jobs and jobs can be modified as needed. Choose an appropriate source view: JOBS_BY_USER, JOBS_BY_PROJECT, JOBS_BY_ORGANIZATION, or an archive of one of these. Modify the filters as needed. If you have access to a ASSIGNMENT_CHANGES_BY_PROJECT table, you can uncomment and comment the appropriate lines in the query below.

-- TODO: modify script variables as needed
-- End time of "recent" jobs.
declare end_recent_timestamp timestamp default
     current_timestamp
;
-- How far back to go for "recent" jobs.
declare recent_milliseconds int64 default
    2      -- hours
    * 3600 -- seconds/hour
    * 1000 -- milliseconds/second
;
-- Only include "recent" job if creation_to_end_time_ms is greater than or
-- equal to threshold.
declare threshold_creation_to_end_time_ms int64 default
    5      -- minutes
    * 60   -- seconds/minute
    * 1000 -- milliseconds/second
;
-- How far back to go for history for "recent" jobs.
declare prior_days int64 default
    7
;
-- Only include a prior job as part of history for a "recent" job if the
-- absolute value of the difference of the metric between the "recent" job and
-- the prior job is within the threshold.
declare threshold_creation_time_ms int64 default
    2      -- hours
    * 3600 -- seconds/hour
    * 1000 -- milliseconds/second
;
-- Only include a prior job as part of history for a "recent" job if the
-- absolute value of the percent difference of the metric between the "recent"
-- job and the prior job is within the threshold.
declare threshold_total_bytes_processed_pct float64 default
    0.10
;

declare start_recent_timestamp timestamp default 
    timestamp_sub(
        end_recent_timestamp
        ,interval recent_milliseconds millisecond
    )
;
declare end_date date default
    date(end_recent_timestamp)
;
declare start_date date default
    date_sub(
        end_date
        ,interval prior_days day
    )
;

with
recent_jobs as
(
    select
        j.project_id
        ,j.job_id
        ,j.reservation_id
        ,j.query
        ,j.statement_type
        ,character_length(j.query) as query_length
        ,sha512(j.query) as query_sha512
        ,j.destination_table.project_id as destination_project_id
        ,case
            when
                j.destination_table.dataset_id like '_%'
                and j.destination_table.table_id like 'anon%'
                then '<hidden>'
            else j.destination_table.dataset_id
        end as destination_dataset_id
        ,case
            when
                j.destination_table.dataset_id like '_%'
                and j.destination_table.table_id like 'anon%'
                then '<anonymous>'
            when
                j.destination_table.table_id like '%$%'
                then
                regexp_replace(
                    j.destination_table.table_id
                    ,'$.*'
                    ,'$<partition>'
                )
            else j.destination_table.table_id
        end as destination_table_id
        ,j.creation_time
        ,j.end_time
        ,coalesce(
            j.end_time
            ,current_timestamp
        ) as end_or_current_time
        ,j.total_bytes_processed
        ,j.timeline[
            safe_ordinal(array_length(j.timeline))
        ].completed_units
        ,(
            select
                sum(js.shuffle_output_bytes)
            from unnest(j.job_stages) js
        ) as shuffle_output_bytes
        ,(
            select
                sum(js.shuffle_output_bytes_spilled)
            from unnest(j.job_stages) js
        ) as shuffle_output_bytes_spilled
        ,j.job_stages[
            safe_ordinal(array_length(j.job_stages))
        ].records_written as final_records_written
        ,j.total_slot_ms
        ,timestamp_diff(
            coalesce(
                j.end_time
                ,current_timestamp
            )
            ,j.creation_time
            ,millisecond
        ) as creation_to_end_ms
-- TODO: comment/uncomment lines as needed to choose jobs source
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION` j
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT` j
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_USER` j
--     from `PROJECT_ID.ARCHIVE_DATASET.JOBS` j
    -- default to this query's billing project
--     from `region-us.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION` j
--     from `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT` j
    from `region-us.INFORMATION_SCHEMA.JOBS_BY_USER` j
    where
        j.creation_time between start_recent_timestamp and end_recent_timestamp
        and j.job_type = 'QUERY'
        and j.statement_type <> 'SCRIPT'
        and j.total_slot_ms > 0
        and timestamp_diff(
            coalesce(
                j.end_time
                ,current_timestamp
            )
            ,j.creation_time
            ,millisecond
        ) >= threshold_creation_to_end_time_ms
-- TODO: modify filters as needed
        -- some optional recent query filters
--         and j.project_id like 'pr-%'
--         and j.project_id = 'PROJECT_ID'
--         and j.job_id = 'JOB_ID'
)
,recent_jobs_ranked as
(
    select
        j.project_id
        ,j.job_id
        ,j.reservation_id
        ,j.query
        ,j.statement_type
        ,j.query_length
        ,j.query_sha512
        ,struct(
            j.destination_project_id as project_id
            ,j.destination_dataset_id as dataset_id
            ,j.destination_table_id as table_id
        ) as destination_table
        ,j.creation_time
        ,j.end_time
        ,j.end_or_current_time
        ,j.total_bytes_processed
        ,j.completed_units
        ,j.shuffle_output_bytes
        ,j.shuffle_output_bytes_spilled
        ,j.final_records_written
        ,j.total_slot_ms
        ,j.creation_to_end_ms
        ,row_number() over (
            partition by
                j.project_id
                ,j.statement_type
                ,character_length(j.query)
                ,sha512(j.query)
                ,j.destination_project_id
                ,j.destination_dataset_id
                ,j.destination_table_id
            order by
                j.creation_to_end_ms desc
        ) as row_number
    from recent_jobs j
)
,recent_queries as
(
    select
        j.* except (
            row_number
        )
    from recent_jobs_ranked j
    where
        row_number = 1
)
,jobs as
(
    select
        j.project_id
        ,j.job_id
        ,rq.job_id as recent_job_id
        ,rq.creation_time as recent_creation_time
        ,rq.reservation_id as recent_reservation_id
        ,j.query
        ,j.statement_type
        ,struct(
            j.destination_table.project_id
            ,case
                when
                    j.destination_table.dataset_id like '_%'
                    and j.destination_table.table_id like 'anon%'
                    then '<hidden>'
                else j.destination_table.dataset_id
            end as dataset_id
            ,case
                when
                    j.destination_table.dataset_id like '_%'
                    and j.destination_table.table_id like 'anon%'
                    then '<anonymous>'
                when
                    j.destination_table.table_id like '%$%'
                    then
                    regexp_replace(
                        j.destination_table.table_id
                        ,'$.*'
                        ,'$<partition>'
                    )
                else j.destination_table.table_id
            end as table_id
        ) as destination_table
        ,j.creation_time
        ,j.end_time
        ,coalesce(
            j.end_time
            ,current_timestamp
        ) as end_or_current_time
        ,j.total_bytes_processed
        ,j.timeline[
            safe_ordinal(array_length(j.timeline))
        ].completed_units
        ,(
            select
                sum(js.shuffle_output_bytes)
            from unnest(j.job_stages) js
        ) as shuffle_output_bytes
        ,(
            select
                sum(js.shuffle_output_bytes_spilled)
            from unnest(j.job_stages) js
        ) as shuffle_output_bytes_spilled
        ,j.job_stages[
            safe_ordinal(array_length(j.job_stages))
        ].records_written as final_records_written
        ,j.total_slot_ms
        ,timestamp_diff(
            coalesce(
                j.end_time
                ,current_timestamp
            )
            ,j.creation_time
            ,millisecond
        ) as creation_to_end_ms
-- TODO: comment/uncomment lines as needed to choose jobs source
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION` j
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT` j
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_USER` j
--     from `PROJECT_ID.ARCHIVE_DATASET.JOBS` j
    -- default to this query's billing project
--     from `region-us.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION` j
--     from `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT` j
    from `region-us.INFORMATION_SCHEMA.JOBS_BY_USER` j
    join recent_queries rq
    on
        rq.project_id = j.project_id
        and rq.statement_type = j.statement_type
        and rq.query_length = character_length(j.query)
        and rq.query_sha512 = sha512(j.query)
        and rq.destination_table.project_id = j.destination_table.project_id
        and rq.destination_table.dataset_id =
            case
                when
                    j.destination_table.dataset_id like '_%'
                    and j.destination_table.table_id like 'anon%'
                    then '<hidden>'
                else j.destination_table.dataset_id
            end
        and rq.destination_table.table_id =
            case
                when
                    j.destination_table.dataset_id like '_%'
                    and j.destination_table.table_id like 'anon%'
                    then '<anonymous>'
                when
                    j.destination_table.table_id like '%$%'
                    then
                    regexp_replace(
                        j.destination_table.table_id
                        ,'$.*'
                        ,'$<partition>'
                    )
                else j.destination_table.table_id
            end
        and
            abs(
                time_diff(
                    time(rq.creation_time)
                    ,time(j.creation_time)
                    ,millisecond
                )
            ) <= threshold_creation_time_ms
        and (
            abs(
                safe_divide(
                    j.total_bytes_processed
                        - rq.total_bytes_processed
                    ,rq.total_bytes_processed
                )
            ) <= threshold_total_bytes_processed_pct
            or rq.total_bytes_processed is null
        )
    where
        date(j.creation_time) between start_date and end_date
        and j.creation_time <= end_recent_timestamp
        and j.job_type = 'QUERY'
        and j.statement_type <> 'SCRIPT'
        and j.total_slot_ms > 0
)
,queries as
(
    select
        j.project_id
        ,j.query
        ,j.statement_type
        ,character_length(j.query) as query_length
        ,sha512(j.query) as query_hash
        ,struct(
            j.destination_table.project_id
            ,j.destination_table.dataset_id
            ,j.destination_table.table_id
        ) destination_table
        ,max(
            j.recent_job_id
        ) as recent_job_id
        ,max(
            j.recent_creation_time
        ) as recent_creation_time
        ,max(
            j.recent_reservation_id
        ) as recent_reservation_id
        ,max(
            case
                when j.job_id = j.recent_job_id then j.total_bytes_processed
            end
        ) as recent_total_bytes_processed
        ,max(
            case
                when j.job_id = j.recent_job_id then j.completed_units
            end
        ) as recent_completed_units
        ,max(
            case
                when j.job_id = j.recent_job_id then j.shuffle_output_bytes
            end
        ) as recent_shuffle_output_bytes
        ,max(
            case
                when j.job_id = j.recent_job_id then j.shuffle_output_bytes_spilled
            end
        ) as recent_shuffle_output_bytes_spilled
        ,max(
            case
                when j.job_id = j.recent_job_id then j.final_records_written
            end
        ) as recent_final_records_written
        ,max(
            case
                when j.job_id = j.recent_job_id then j.total_slot_ms
            end
        ) as recent_total_slot_ms
        ,max(
            case
                when j.job_id = j.recent_job_id then j.creation_to_end_ms
            end
        ) as recent_creation_to_end_ms
        ,count(distinct date(j.creation_time)) as date_count
        ,count(*) as job_count
        ,count(
            case
                when
                    j.creation_time between
                        start_recent_timestamp
                        and end_recent_timestamp
                    then
                    1
            end
        ) as recent_job_count
        ,sum(j.total_bytes_processed) as total_bytes_processed
        ,sum(j.completed_units) as completed_units
        ,sum(j.shuffle_output_bytes) as shuffle_output_bytes
        ,sum(j.shuffle_output_bytes_spilled) as shuffle_output_bytes_spilled
        ,sum(j.final_records_written) as final_records_written
        ,sum(j.total_slot_ms) as total_slot_ms
        ,sum(j.creation_to_end_ms) as creation_to_end_ms
        ,max(j.creation_to_end_ms) as max_creation_to_end_ms
    from jobs j
    group by
        j.project_id
        ,j.query
        ,j.statement_type
        ,j.destination_table.project_id
        ,j.destination_table.dataset_id
        ,j.destination_table.table_id
    having
        job_count > 1
)
,queries_diffs as
(
    select
        q.project_id
        ,q.recent_job_id
        ,q.recent_creation_time
        ,q.recent_reservation_id
        ,q.date_count
        ,q.job_count
        ,q.recent_total_bytes_processed
        ,safe_divide(
            q.recent_total_bytes_processed
                - q.total_bytes_processed / q.job_count
            ,q.total_bytes_processed / q.job_count
        ) as diff_total_bytes_processed_pct
        ,q.recent_completed_units
        ,safe_divide(
            q.recent_completed_units
                - q.completed_units / q.job_count
            ,q.completed_units / q.job_count
        ) as diff_completed_units_pct
        ,q.recent_shuffle_output_bytes
        ,safe_divide(
            q.recent_shuffle_output_bytes
                - q.shuffle_output_bytes / q.job_count
            ,q.shuffle_output_bytes / q.job_count
        ) as diff_shuffle_output_bytes_pct
        ,q.recent_shuffle_output_bytes_spilled
        ,safe_divide(
            q.recent_shuffle_output_bytes_spilled
                - q.shuffle_output_bytes_spilled / q.job_count
            ,q.shuffle_output_bytes_spilled / q.job_count
        ) as diff_shuffle_output_bytes_spilled_pct
        ,q.recent_final_records_written
        ,safe_divide(
            q.recent_final_records_written
            - q.final_records_written / q.job_count
            ,q.final_records_written / q.job_count
        ) as diff_final_records_written_pct
        ,q.recent_total_slot_ms
        ,safe_divide(
            q.recent_total_slot_ms
            - q.total_slot_ms / q.job_count
            ,q.total_slot_ms / q.job_count
        ) as diff_total_slot_ms_pct
        ,q.recent_creation_to_end_ms
        ,q.recent_creation_to_end_ms
            - cast(q.creation_to_end_ms / q.job_count as int64)
            as diff_creation_to_end_ms
        ,safe_divide(
            q.recent_creation_to_end_ms
                - q.creation_to_end_ms / q.job_count
            ,q.creation_to_end_ms / q.job_count
        ) as diff_creation_to_end_ms_pct
    from queries q
)
,queries_bucketed as
(
    select
        q.project_id
        ,countif(
            diff_creation_to_end_ms
            <
                1      -- seconds
                * 1000 -- milliseconds/second
        ) as diff_creation_to_end_0s_to_1s_query_count
        ,countif(
            diff_creation_to_end_ms
            >=
                1      -- seconds
                * 1000 -- milliseconds/second
            and
            diff_creation_to_end_ms
            <
                10     -- seconds
                * 1000 -- milliseconds/second
        ) as diff_creation_to_end_1s_to_10s_query_count
        ,countif(
            diff_creation_to_end_ms
            >=
                10     -- seconds
                * 1000 -- milliseconds/second
            and
            diff_creation_to_end_ms
            <
                1      -- minutes
                * 60   -- seconds/minute
                * 1000 -- milliseconds/second
        ) as diff_creation_to_end_10s_to_1m_query_count
        ,countif(
            diff_creation_to_end_ms
            >=
                1      -- minutes
                * 60   -- seconds/minute
                * 1000 -- milliseconds/second
            and
            diff_creation_to_end_ms
            <
                5      -- minutes
                * 60   -- seconds/minute
                * 1000 -- milliseconds/second
        ) as diff_creation_to_end_1m_to_5m_query_count
        ,countif(
            diff_creation_to_end_ms
            >=
                5      -- minutes
                * 60   -- seconds/minute
                * 1000 -- milliseconds/second
            and
            diff_creation_to_end_ms
            <
                15     -- minutes
                * 60   -- seconds/minute
                * 1000 -- milliseconds/second
        ) as diff_creation_to_end_5m_to_15m_query_count
        ,countif(
            diff_creation_to_end_ms
            >=
                15     -- minutes
                * 60   -- seconds/minute
                * 1000 -- milliseconds/second
            and
            diff_creation_to_end_ms
            <
                30     -- minutes
                * 60   -- seconds/minute
                * 1000 -- milliseconds/second
        ) as diff_creation_to_end_15m_to_30m_query_count
        ,countif(
            diff_creation_to_end_ms
            >=
                30     -- minutes
                * 60   -- seconds/minute
                * 1000 -- milliseconds/second
        ) as diff_creation_to_end_30m_to_inf_query_count
    from queries_diffs q
    group by
        q.project_id
)
,queries_ranked as
(
    select
        q.*
        ,row_number() over (
            partition by
                q.project_id
            order by
                q.diff_creation_to_end_ms desc
        ) as row_number
    from queries_diffs q
)
-- TODO: uncomment lines if have access to ASSIGNMENT_CHANGES_BY_PROJECT
-- ,assignments_history_stage as
-- (
--     select
--         ac.change_timestamp as start_time
--         ,coalesce(
--             timestamp_add(
--                 lead(ac.change_timestamp) over (
--                     partition by
--                         ac.project_id
--                         ,ac.assignment_id
--                     order by
--                         ac.change_timestamp
--                 )
--                 ,interval -1 microsecond
--             )
--             ,'9999-12-31 23:59:59.999999'
--         ) as end_time
--         ,ac.action
--         ,ac.project_id
--         ,ac.reservation_name
--         ,ac.assignee_type
--         ,ac.assignee_id
--     -- TODO: modify source as needed
--     from `ADMIN_PROJECT.region-us.INFORMATION_SCHEMA.ASSIGNMENT_CHANGES_BY_PROJECT` ac
-- )
-- ,assignments_history as
-- (
--     select
--         ahs.* except(
--             action
--         )
--     from assignments_history_stage ahs
--     where
--         ahs.action <> 'DELETE'
-- )
select
    q.project_id
-- TODO: comment line and uncomment lines if have access to ASSIGNMENT_CHANGES_BY_PROJECT
    ,q.recent_reservation_id
--     ,coalesce(
--         q.recent_reservation_id
--         ,concat(
--             ahp.project_id
--             ,':US.'
--             ,ahp.reservation_name
--         )
--         /* need to add logic for handling folder assignments... */
--         ,concat(
--             aho.project_id
--             ,':US.'
--             ,aho.reservation_name
--         )
--     ) as recent_reservation_id
    ,qb.diff_creation_to_end_0s_to_1s_query_count
    ,qb.diff_creation_to_end_1s_to_10s_query_count
    ,qb.diff_creation_to_end_10s_to_1m_query_count
    ,qb.diff_creation_to_end_1m_to_5m_query_count
    ,qb.diff_creation_to_end_5m_to_15m_query_count
    ,qb.diff_creation_to_end_15m_to_30m_query_count
    ,qb.diff_creation_to_end_30m_to_inf_query_count
    ,q.recent_job_id
    ,q.date_count
    ,q.job_count
    ,q.recent_total_bytes_processed
    ,q.diff_total_bytes_processed_pct
    ,q.recent_completed_units
    ,q.diff_completed_units_pct
    ,q.recent_shuffle_output_bytes
    ,q.diff_shuffle_output_bytes_pct
    ,q.recent_shuffle_output_bytes_spilled
    ,q.diff_shuffle_output_bytes_spilled_pct
    ,q.recent_final_records_written
    ,q.diff_final_records_written_pct
    ,q.recent_total_slot_ms
    ,q.diff_total_slot_ms_pct
    ,q.recent_creation_to_end_ms
    ,q.diff_creation_to_end_ms
    ,q.diff_creation_to_end_ms_pct
from queries_ranked q
join queries_bucketed qb
on
    qb.project_id = q.project_id
-- TODO: uncomment lines if have access to ASSIGNMENT_CHANGES_BY_PROJECT
-- left join assignments_history ahp
-- on
--     ahp.assignee_id = q.project_id
--     and q.recent_creation_time between ahp.start_time and ahp.end_time
--     and ahp.assignee_type = 'PROJECT'
-- left join assignments_history aho
-- on
--     q.recent_creation_time between aho.start_time and aho.end_time
--     and aho.assignee_type = 'ORGANIZATION'
where
    q.row_number = 1
order by
    q.diff_creation_to_end_ms desc
;

The query as written targets region-us.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION in two places. But this can be modified to target JOBS_BY_PROJECT or JOBS_BY_USER or an archive. Under comment some optional recent query filters, there are some commented out examples of filters on recent queries. Finally, there are several script variables that can be modified at the top of the script:

end_recent_timestamp

End time of recent jobs.

recent_milliseconds

How far back to go for recent jobs.

threshold_creation_to_end_time_ms

Only include recent job if metric is greater than or equal to the threshold.

prior_days

How far back to go for history for recent jobs.

threshold_creation_time_ms

Only include a prior job as part of history for a recent job if the absolute value of the difference of the metric between the recent job and the prior job is within the threshold.

threshold_total_bytes_processed_pct

Only include a prior job as part of history for a recent job if the absolute value of the percent difference of the metric between the recent job and the prior job is within threshold.

The query finds recent jobs with job_type equal to 'QUERY', statement_type not equal to 'SCRIPT', total_slot_ms greater than 0, and the difference between end_time (or, if null, current_timestamp) and creation_time greater than or equal to threshold_creation_to_end_time_ms.

The query ranks recent jobs for each distinct recent query (determined by project_id, statement_type, query_length, query_sha512, and destination_table) by creation_to_end_ms descending. Only the top ranking job for each distinct recent query is kept.

For each distinct recent query, the query finds prior matching jobs subject to the conditions below. Statistics are generated for each distinct recent query based on the matching prior jobs.

The final result shows statistics per project. There are fields with a count of recent queries with given differences between recent query and average matching job elapsed time. E.g., diff_creation_to_end_30m_to_inf_query_count gives the count of recent queries where the difference between the recent query and the average matching job elapsed time is greater than or equal to 30 minutes. The per-project worst-case recent job is identified based on the difference between the recent query and average matching job elapsed time. recent_job_id corresponds to this worst-case recent job. And all the following metrics are related to it and the matching prior jobs. E.g., recent_creation_to_end_ms is the worst-case recent job creation_to_end_ms --- number of milliseconds between creation_time and end_time. diff_creation_to_end_ms is the difference between the worse-case recent job and average matching job creation_to_end_ms. diff_creation_to_end_ms_pct is the related percent difference.

The query below is similar to the one above with only a few differences. A number_of_top_queries script variable is provided to limit the number of top recent queries to show per project. And the final result only shows information about those top recent queries.

-- TODO: modify script variables as needed
-- End time of "recent" jobs.
declare end_recent_timestamp timestamp default
     current_timestamp
;
-- How far back to go for "recent" jobs.
declare recent_milliseconds int64 default
    2      -- hours
    * 3600 -- seconds/hour
    * 1000 -- milliseconds/second
;
-- Only include "recent" job if creation_to_end_time_ms is greater than or
-- equal to threshold.
declare threshold_creation_to_end_time_ms int64 default
    5      -- minutes
    * 60   -- seconds/minute
    * 1000 -- milliseconds/second
;
-- How far back to go for history for "recent" jobs.
declare prior_days int64 default
    7
;
-- Only include a prior job as part of history for a "recent" job if the
-- absolute value of the difference of the metric between the "recent" job and
-- the prior job is within the threshold.
declare threshold_creation_time_ms int64 default
    2      -- hours
    * 3600 -- seconds/hour
    * 1000 -- milliseconds/second
;
-- Only include a prior job as part of history for a "recent" job if the
-- absolute value of the percent difference of the metric between the "recent"
-- job and the prior job is within the threshold.
declare threshold_total_bytes_processed_pct float64 default
    0.10
;
-- number of top queries to display per project
declare number_of_top_queries int64 default
    10
;

declare start_recent_timestamp timestamp default 
    timestamp_sub(
        end_recent_timestamp
        ,interval recent_milliseconds millisecond
    )
;
declare end_date date default
    date(end_recent_timestamp)
;
declare start_date date default
    date_sub(
        end_date
        ,interval prior_days day
    )
;

with
recent_jobs as
(
    select
        j.project_id
        ,j.job_id
        ,j.reservation_id
        ,j.query
        ,j.statement_type
        ,character_length(j.query) as query_length
        ,sha512(j.query) as query_sha512
        ,j.destination_table.project_id as destination_project_id
        ,case
            when
                j.destination_table.dataset_id like '_%'
                and j.destination_table.table_id like 'anon%'
                then '<hidden>'
            else j.destination_table.dataset_id
        end as destination_dataset_id
        ,case
            when
                j.destination_table.dataset_id like '_%'
                and j.destination_table.table_id like 'anon%'
                then '<anonymous>'
            when
                j.destination_table.table_id like '%$%'
                then
                regexp_replace(
                    j.destination_table.table_id
                    ,'$.*'
                    ,'$<partition>'
                )
            else j.destination_table.table_id
        end as destination_table_id
        ,j.creation_time
        ,j.end_time
        ,coalesce(
            j.end_time
            ,current_timestamp
        ) as end_or_current_time
        ,j.total_bytes_processed
        ,j.timeline[
            safe_ordinal(array_length(j.timeline))
        ].completed_units
        ,(
            select
                sum(js.shuffle_output_bytes)
            from unnest(j.job_stages) js
        ) as shuffle_output_bytes
        ,(
            select
                sum(js.shuffle_output_bytes_spilled)
            from unnest(j.job_stages) js
        ) as shuffle_output_bytes_spilled
        ,j.job_stages[
            safe_ordinal(array_length(j.job_stages))
        ].records_written as final_records_written
        ,j.total_slot_ms
        ,timestamp_diff(
            coalesce(
                j.end_time
                ,current_timestamp
            )
            ,j.creation_time
            ,millisecond
        ) as creation_to_end_ms
-- TODO: comment/uncomment lines as needed to choose jobs source
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION` j
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT` j
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_USER` j
--     from `PROJECT_ID.ARCHIVE_DATASET.JOBS` j
    -- default to this query's billing project
--     from `region-us.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION` j
--     from `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT` j
    from `region-us.INFORMATION_SCHEMA.JOBS_BY_USER` j
    where
        j.creation_time between start_recent_timestamp and end_recent_timestamp
        and j.job_type = 'QUERY'
        and j.statement_type <> 'SCRIPT'
        and j.total_slot_ms > 0
        and timestamp_diff(
            coalesce(
                j.end_time
                ,current_timestamp
            )
            ,j.creation_time
            ,millisecond
        )  >= threshold_creation_to_end_time_ms
-- TODO: modify filters as needed
        -- some optional recent query filters
--         and j.project_id like 'pr-%'
--         and j.project_id = 'PROJECT_ID'
--         and j.job_id = 'JOB_ID'
)
,recent_jobs_ranked as
(
    select
        j.project_id
        ,j.job_id
        ,j.reservation_id
        ,j.query
        ,j.statement_type
        ,j.query_length
        ,j.query_sha512
        ,struct(
            j.destination_project_id as project_id
            ,j.destination_dataset_id as dataset_id
            ,j.destination_table_id as table_id
        ) as destination_table
        ,j.creation_time
        ,j.end_time
        ,j.end_or_current_time
        ,j.total_bytes_processed
        ,j.completed_units
        ,j.shuffle_output_bytes
        ,j.shuffle_output_bytes_spilled
        ,j.final_records_written
        ,j.total_slot_ms
        ,j.creation_to_end_ms
        ,row_number() over (
            partition by
                j.project_id
                ,j.statement_type
                ,character_length(j.query)
                ,sha512(j.query)
                ,j.destination_project_id
                ,j.destination_dataset_id
                ,j.destination_table_id
            order by
                j.creation_to_end_ms desc
        ) as row_number
    from recent_jobs j
)
,recent_queries as
(
    select
        j.* except (
            row_number
        )
    from recent_jobs_ranked j
    where
        row_number = 1
)
,jobs as
(
    select
        j.project_id
        ,j.job_id
        ,rq.job_id as recent_job_id
        ,rq.creation_time as recent_creation_time
        ,rq.reservation_id as recent_reservation_id
        ,j.query
        ,j.statement_type
        ,struct(
            j.destination_table.project_id
            ,case
                when
                    j.destination_table.dataset_id like '_%'
                    and j.destination_table.table_id like 'anon%'
                    then '<hidden>'
                else j.destination_table.dataset_id
            end as dataset_id
            ,case
                when
                    j.destination_table.dataset_id like '_%'
                    and j.destination_table.table_id like 'anon%'
                    then '<anonymous>'
                when
                    j.destination_table.table_id like '%$%'
                    then
                    regexp_replace(
                        j.destination_table.table_id
                        ,'$.*'
                        ,'$<partition>'
                    )
                else j.destination_table.table_id
            end as table_id
        ) as destination_table
        ,j.creation_time
        ,j.end_time
        ,coalesce(
            j.end_time
            ,current_timestamp
        ) as end_or_current_time
        ,j.total_bytes_processed
        ,j.timeline[
            safe_ordinal(array_length(j.timeline))
        ].completed_units
        ,(
            select
                sum(js.shuffle_output_bytes)
            from unnest(j.job_stages) js
        ) as shuffle_output_bytes
        ,(
            select
                sum(js.shuffle_output_bytes_spilled)
            from unnest(j.job_stages) js
        ) as shuffle_output_bytes_spilled
        ,j.job_stages[
            safe_ordinal(array_length(j.job_stages))
        ].records_written as final_records_written
        ,j.total_slot_ms
        ,timestamp_diff(
            coalesce(
                j.end_time
                ,current_timestamp
            )
            ,j.creation_time
            ,millisecond
        ) as creation_to_end_ms
-- TODO: comment/uncomment lines as needed to choose jobs source
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION` j
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT` j
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_USER` j
--     from `PROJECT_ID.ARCHIVE_DATASET.JOBS` j
    -- default to this query's billing project
--     from `region-us.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION` j
--     from `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT` j
    from `region-us.INFORMATION_SCHEMA.JOBS_BY_USER` j
    join recent_queries rq
    on
        rq.project_id = j.project_id
        and rq.statement_type = j.statement_type
        and rq.query_length = character_length(j.query)
        and rq.query_sha512 = sha512(j.query)
        and rq.destination_table.project_id = j.destination_table.project_id
        and rq.destination_table.dataset_id =
            case
                when
                    j.destination_table.dataset_id like '_%'
                    and j.destination_table.table_id like 'anon%'
                    then '<hidden>'
                else j.destination_table.dataset_id
            end
        and rq.destination_table.table_id =
            case
                when
                    j.destination_table.dataset_id like '_%'
                    and j.destination_table.table_id like 'anon%'
                    then '<anonymous>'
                when
                    j.destination_table.table_id like '%$%'
                    then
                    regexp_replace(
                        j.destination_table.table_id
                        ,'$.*'
                        ,'$<partition>'
                    )
                else j.destination_table.table_id
            end
        and
            abs(
                time_diff(
                    time(rq.creation_time)
                    ,time(j.creation_time)
                    ,millisecond
                )
            ) <= threshold_creation_time_ms
        and (
            abs(
                safe_divide(
                    j.total_bytes_processed
                        - rq.total_bytes_processed
                    ,rq.total_bytes_processed
                )
            ) <= threshold_total_bytes_processed_pct
            or rq.total_bytes_processed is null
        )
    where
        date(j.creation_time) between start_date and end_date
        and j.creation_time <= end_recent_timestamp
        and j.job_type = 'QUERY'
        and j.statement_type <> 'SCRIPT'
        and j.total_slot_ms > 0
)
,queries as
(
    select
        j.project_id
        ,j.query
        ,j.statement_type
        ,character_length(j.query) as query_length
        ,sha512(j.query) as query_hash
        ,struct(
            j.destination_table.project_id
            ,j.destination_table.dataset_id
            ,j.destination_table.table_id
        ) destination_table
        ,max(
            j.recent_job_id
        ) as recent_job_id
        ,max(
            j.recent_creation_time
        ) as recent_creation_time
        ,max(
            j.recent_reservation_id
        ) as recent_reservation_id
        ,max(
            case
                when j.job_id = j.recent_job_id then j.total_bytes_processed
            end
        ) as recent_total_bytes_processed
        ,max(
            case
                when j.job_id = j.recent_job_id then j.completed_units
            end
        ) as recent_completed_units
        ,max(
            case
                when j.job_id = j.recent_job_id then j.shuffle_output_bytes
            end
        ) as recent_shuffle_output_bytes
        ,max(
            case
                when j.job_id = j.recent_job_id then j.shuffle_output_bytes_spilled
            end
        ) as recent_shuffle_output_bytes_spilled
        ,max(
            case
                when j.job_id = j.recent_job_id then j.final_records_written
            end
        ) as recent_final_records_written
        ,max(
            case
                when j.job_id = j.recent_job_id then j.total_slot_ms
            end
        ) as recent_total_slot_ms
        ,max(
            case
                when j.job_id = j.recent_job_id then j.creation_to_end_ms
            end
        ) as recent_creation_to_end_ms
        ,count(distinct date(j.creation_time)) as date_count
        ,count(*) as job_count
        ,count(
            case
                when
                    j.creation_time between
                        start_recent_timestamp
                        and end_recent_timestamp
                    then
                    1
            end
        ) as recent_job_count
        ,sum(j.total_bytes_processed) as total_bytes_processed
        ,sum(j.completed_units) as completed_units
        ,sum(j.shuffle_output_bytes) as shuffle_output_bytes
        ,sum(j.shuffle_output_bytes_spilled) as shuffle_output_bytes_spilled
        ,sum(j.final_records_written) as final_records_written
        ,sum(j.total_slot_ms) as total_slot_ms
        ,sum(j.creation_to_end_ms) as creation_to_end_ms
        ,max(j.creation_to_end_ms) as max_creation_to_end_ms
    from jobs j
    group by
        j.project_id
        ,j.query
        ,j.statement_type
        ,j.destination_table.project_id
        ,j.destination_table.dataset_id
        ,j.destination_table.table_id
    having
        job_count > 1
)
,queries_diffs as
(
    select
        q.project_id
        ,q.recent_job_id
        ,q.recent_creation_time
        ,q.recent_reservation_id
        ,q.date_count
        ,q.job_count
        ,q.recent_total_bytes_processed
        ,safe_divide(
            q.recent_total_bytes_processed
                - q.total_bytes_processed / q.job_count
            ,q.total_bytes_processed / q.job_count
        ) as diff_total_bytes_processed_pct
        ,q.recent_completed_units
        ,safe_divide(
            q.recent_completed_units
                - q.completed_units / q.job_count
            ,q.completed_units / q.job_count
        ) as diff_completed_units_pct
        ,q.recent_shuffle_output_bytes
        ,safe_divide(
            q.recent_shuffle_output_bytes
                - q.shuffle_output_bytes / q.job_count
            ,q.shuffle_output_bytes / q.job_count
        ) as diff_shuffle_output_bytes_pct
        ,q.recent_shuffle_output_bytes_spilled
        ,safe_divide(
            q.recent_shuffle_output_bytes_spilled
                - q.shuffle_output_bytes_spilled / q.job_count
            ,q.shuffle_output_bytes_spilled / q.job_count
        ) as diff_shuffle_output_bytes_spilled_pct
        ,q.recent_final_records_written
        ,safe_divide(
            q.recent_final_records_written
            - q.final_records_written / q.job_count
            ,q.final_records_written / q.job_count
        ) as diff_final_records_written_pct
        ,q.recent_total_slot_ms
        ,safe_divide(
            q.recent_total_slot_ms
            - q.total_slot_ms / q.job_count
            ,q.total_slot_ms / q.job_count
        ) as diff_total_slot_ms_pct
        ,q.recent_creation_to_end_ms
        ,q.recent_creation_to_end_ms
            - cast(q.creation_to_end_ms / q.job_count as int64)
            as diff_creation_to_end_ms
        ,safe_divide(
            q.recent_creation_to_end_ms
                - q.creation_to_end_ms / q.job_count
            ,q.creation_to_end_ms / q.job_count
        ) as diff_creation_to_end_ms_pct
    from queries q
)
,queries_ranked as
(
    select
        q.*
        ,row_number() over (
            partition by
                q.project_id
            order by
                q.diff_creation_to_end_ms desc
        ) as row_number
    from queries_diffs q
)
-- TODO: uncomment lines if have access to ASSIGNMENT_CHANGES_BY_PROJECT
-- ,assignments_history_stage as
-- (
--     select
--         ac.change_timestamp as start_time
--         ,coalesce(
--             timestamp_add(
--                 lead(ac.change_timestamp) over (
--                     partition by
--                         ac.project_id
--                         ,ac.assignment_id
--                     order by
--                         ac.change_timestamp
--                 )
--                 ,interval -1 microsecond
--             )
--             ,'9999-12-31 23:59:59.999999'
--         ) as end_time
--         ,ac.action
--         ,ac.project_id
--         ,ac.reservation_name
--         ,ac.assignee_type
--         ,ac.assignee_id
--     -- TODO: modify source as needed
--     from `ADMIN_PROJECT.region-us.INFORMATION_SCHEMA.ASSIGNMENT_CHANGES_BY_PROJECT` ac
-- )
-- ,assignments_history as
-- (
--     select
--         ahs.* except(
--             action
--         )
--     from assignments_history_stage ahs
--     where
--         ahs.action <> 'DELETE'
-- )
select
    q.project_id
-- TODO: comment line and uncomment lines if have access to ASSIGNMENT_CHANGES_BY_PROJECT
    ,q.recent_reservation_id
--     ,coalesce(
--         q.recent_reservation_id
--         ,concat(
--             ahp.project_id
--             ,':US.'
--             ,ahp.reservation_name
--         )
--         /* need to add logic for handling folder assignments... */
--         ,concat(
--             aho.project_id
--             ,':US.'
--             ,aho.reservation_name
--         )
--     ) as recent_reservation_id
    ,q.recent_job_id
    ,q.date_count
    ,q.job_count
    ,q.recent_total_bytes_processed
    ,q.diff_total_bytes_processed_pct
    ,q.recent_completed_units
    ,q.diff_completed_units_pct
    ,q.recent_shuffle_output_bytes
    ,q.diff_shuffle_output_bytes_pct
    ,q.recent_shuffle_output_bytes_spilled
    ,q.diff_shuffle_output_bytes_spilled_pct
    ,q.recent_final_records_written
    ,q.diff_final_records_written_pct
    ,q.recent_total_slot_ms
    ,q.diff_total_slot_ms_pct
    ,q.recent_creation_to_end_ms
    ,q.diff_creation_to_end_ms
    ,q.diff_creation_to_end_ms_pct
from queries_ranked q
-- TODO: uncomment lines if have access to ASSIGNMENT_CHANGES_BY_PROJECT
-- left join assignments_history ahp
-- on
--     ahp.assignee_id = q.project_id
--     and q.recent_creation_time between ahp.start_time and ahp.end_time
--     and ahp.assignee_type = 'PROJECT'
-- left join assignments_history aho
-- on
--     q.recent_creation_time between aho.start_time and aho.end_time
--     and aho.assignee_type = 'ORGANIZATION'
where
    q.row_number <= number_of_top_queries
order by
    q.project_id
    ,q.diff_creation_to_end_ms desc
;

The queries above could be modified if a few ways.

As it is, the queries match prior jobs to recent queries primarily through query text. It is possible to match based on modified query text; e.g., removing comments and literals. Alternatively, if there is something else to use to match queries (e.g., common job IDs or job labels), that could be used for matching.