6.3. Query Job Stages

Below is an example of a query that can be used to analyze a query job at the job stage grain.

Search in the query for TODO to find lines where common changes to the query can be made. The common table expression jobs_selected can be modified as needed. Choose an appropriate source view: JOBS_BY_USER, JOBS_BY_PROJECT, JOBS_BY_ORGANIZATION, or an archive of one of these. Modify the filters as needed.

with
jobs_selected as
(
    select
        cast(j.creation_time as date) as creation_date
        ,j.project_id
        ,j.job_id
        ,j.start_time
        ,j.end_time
        ,j.job_stages
-- TODO: comment/uncomment lines as needed to choose jobs source
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION` j
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT` j
--     from `PROJECT_ID.region-us.INFORMATION_SCHEMA.JOBS_BY_USER` j
--     from `PROJECT_ID.ARCHIVE_DATASET.JOBS` j
    -- default to this query's billing project
--     from `region-us.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION` j
--     from `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT` j
    from `region-us.INFORMATION_SCHEMA.JOBS_BY_USER` j
    where
        j.job_type = 'QUERY' -- LOAD, QUERY, COPY, or EXTRACT
-- TODO: modify filters as needed
        -- JOBS_BY_ tables partitioned on creation_time by day;
        -- creation_time is UTC and current_date defaults to UTC
--         and date(j.creation_time, 'America/New_York') = current_date('America/New_York')
        and date(j.creation_time) = current_date
--         and j.project_id = 'PROJECT_ID' -- only needed by JOBS_BY_ORGANIZATION
--         and j.job_id = 'JOB_ID'
--         and j.state = 'RUNNING' -- PENDING, RUNNING, or DONE
)
,job_stages as
(
    select
        j.creation_date
        ,j.project_id
        ,j.job_id
        ,job_stages_offset
        ,coalesce(
            js.id
            ,0 -- handle bug where first element id is null
        ) as stage_id -- note that sometimes numbers are skipped
        ,js.name
        ,js.status
        ,case
            when ends_with(js.name, ': Repartition') then
                js.input_stages[
                    ordinal(
                        array_length(
                            js.input_stages
                        )
                    )
                ]
            else
                coalesce(
                    js.id
                    ,0 -- handle bug where first element id is null
                )
        end as primary_stage_id
        ,case
            when ends_with(js.name, ': Repartition') then 1
            else 0
        end as repartition_flag
        ,timestamp_millis(js.start_ms) as start_time
        ,timestamp_millis(js.end_ms) as end_time
        ,js.parallel_inputs
        ,case
            when js.parallel_inputs is not null
                then coalesce(
                    js.completed_parallel_inputs
                    ,0
                )
            else js.parallel_inputs
        end as completed_parallel_inputs
        ,js.wait_ms_avg
        ,js.wait_ms_max
        ,js.read_ms_avg
        ,js.read_ms_max
        ,js.compute_ms_avg
        ,js.compute_ms_max
        ,js.write_ms_avg
        ,js.write_ms_max
        ,js.slot_ms
        ,js.shuffle_output_bytes
        ,js.shuffle_output_bytes_spilled
        ,js.records_read
        ,js.records_written
        ,js.input_stages
        ,(
            select
                string_agg(
                    substr(
                        s.substeps[
                            ordinal(
                                case
                                    when array_length(s.substeps) = 1 then 1
                                    else 2
                                end
                            )
                        ]
                        ,6
                    )
                    ,'\n'
                    order by
                        offset
                )
            from unnest(js.steps) s
                with offset
            where
                s.kind = 'READ'
        ) read_from
        ,(
            select
                string_agg(
                    concat(
                        s.kind
                        ,'\n'
                        ,(
                            select
                                string_agg(
                                    substeps
                                    ,'\n'
                                    order by
                                        offset
                                )
                            from unnest(s.substeps) substeps
                                with offset
                        )
                    )
                    ,'\n\n'
                    order by
                        offset
                )
            from unnest(js.steps) s
                with offset
        ) as steps
    from jobs_selected j
    join unnest(j.job_stages) js
        with offset job_stages_offset
)
,primary_job_stages as
(
    select
        js.project_id
        ,js.job_id
        ,js.primary_stage_id as stage_id
        ,min(js.start_time) as start_time
        ,max(js.end_time) as end_time
        ,sum(js.slot_ms) as slot_ms
        ,sum(
            repartition_flag
        ) as repartition_count
    from job_stages js
    group by
        js.project_id
        ,js.job_id
        ,js.primary_stage_id
)
,all_intervals as
(
    select
        ai.project_id
        ,ai.job_id
        ,ai.start_time
        ,ai.end_time
    from (
        select
            se.project_id
            ,se.job_id
            ,se.a_time as start_time
            ,lead(
                se.a_time
                ,1
                ,null
            ) over (
                partition by
                    se.project_id
                    ,se.job_id
                order by
                    se.a_time
            ) as end_time
        from (
            select
                js.project_id
                ,js.job_id
                ,js.start_time as a_time
            from job_stages js
            union distinct
            select
                js.project_id
                ,js.job_id
                ,js.end_time as a_time
            from job_stages js
        ) se
    ) ai
    where
        ai.end_time is not null
)
select
    js.creation_date
    ,js.project_id
    ,js.job_id
    ,js.job_stages_offset
    ,js.stage_id
    ,js.primary_stage_id
    ,js.name
    ,pjs.stage_id is not null as is_primary_stage
    ,js.status
    -- with this format, value can be treated as datetime in Excel;
    -- format in Excel as yyyy-mm-dd hh:mm:ss.000
    ,format_timestamp(
        '%Y-%m-%d %H:%M:%E*S'
        ,js.start_time
    ) as start_time
    ,format_timestamp(
        '%Y-%m-%d %H:%M:%E*S'
        ,js.end_time
    ) as end_time
    ,timestamp_diff(
        js.end_time
        ,js.start_time
        ,millisecond
    ) as elapsed_ms
    ,js.slot_ms
    ,pjs.repartition_count
    ,format_timestamp(
        '%Y-%m-%d %H:%M:%E*S'
        ,pjs.start_time
    ) as related_start_time
    ,format_timestamp(
        '%Y-%m-%d %H:%M:%E*S'
        ,pjs.end_time
    ) as related_end_time
    ,timestamp_diff(
        pjs.end_time
        ,pjs.start_time
        ,millisecond
    ) as related_elapsed_ms
    ,pjs.slot_ms as related_slot_ms
    ,js.parallel_inputs
    ,js.completed_parallel_inputs
    ,cast(js.completed_parallel_inputs as float64)
        / nullif(js.parallel_inputs, 0)
        as completed_parallel_inputs_percent
    ,js.wait_ms_avg
    ,js.wait_ms_max
    ,js.read_ms_avg
    ,js.read_ms_max
    ,js.compute_ms_avg
    ,js.compute_ms_max
    ,1.0 -
        js.compute_ms_avg
        / nullif(js.compute_ms_max, 0)
        as compute_skew
    ,js.write_ms_avg
    ,js.write_ms_max
    ,js.shuffle_output_bytes
    ,js.shuffle_output_bytes_spilled
    ,js.records_read
    ,js.records_written
    ,array_to_string(
        (
            select
                array_agg(
                    cast(input_stages as string)
                    order by
                        input_stages
                )
            from unnest(js.input_stages) input_stages
        )
        ,','
    ) as input_stages
    ,js.read_from
    ,substr(
        js.steps
        ,1
        ,32767 -- maximum number of characters an Excel cell can contain
    ) as steps
from job_stages js
left join primary_job_stages pjs
on
    pjs.project_id = js.project_id
    and pjs.job_id = js.job_id
    and pjs.stage_id = js.stage_id
order by
    js.creation_date
    ,js.project_id
    ,js.job_id
    ,js.stage_id
;

The query preserves a row for all stages as Google reports them. However, it adds some fields pertaining to the idea of primary stages and related stages. A primary stage is a non-repartitioning stage (e.g., an INPUT stage, JOIN stage, etc.). And a single set of related stages are a single primary stage and all of the repartitioning stages related to it. Sometimes it is useful to see all of the stages, and sometimes it is more useful to focus on the primary stages. The is_primary_stage computed field can be filtered on to focus on just the primary stages. And there are a set of computed related metrics that are intended to represent the metrics over the interval of a single set of related stages.

Fields

creation_date

Cast creation_time as date in UTC time zone.

project_id

Job project_id.

job_id

Job job_id.

job_stages_offset

Zero-based offset of job_stages array.

stage_id

Based on job stage id: coalesce(id, 0). Field id is always null for the first element. Monotonically increasing but not necessarily contiguous.

primary_stage_id

stage_id of the primary stage for this set of related stages.

name

Job stage name. While a job is running, the name may be something misleading like S00: Output. But when the job is done, the name for the same stage will have changed to something which makes sense like S00: Input.

is_primary_stage

Whether or not this stage is a primary stage. true or false.

status

Job stage status:

'PENDING'

Job encountered error or cancellation after this stage was added but before it started.

'RUNNING'

Interval between job stage start_ms and job stage end_ms.

'CANCELLED'

Job cancelled after this stage was added but before it could complete.

'FAILED'

Error occurred in this stage.

'COMPLETE'

After job stage end_ms; succesful.

start_time

Job stage start_ms converted to timestamp. null until the job is done.

end_time

Job stage end_ms converted to timestamp. null until the job is done.

elapsed_ms

end_ms - start_ms.

slot_ms

Job stage slot milliseconds. This will not reflect the true value until the job has completed.

repartition_count

Number of repartitioning stages associated with the primary stage.

related_start_time

Earliest start_time for a set of related stages.

related_end_time

Last end_time for a set of related stages.

related_elapsed_ms

related_end_time - related_start_time.

related_slot_ms

Sum of job stage slot_ms for a set of related stages. This will not reflect the true value until the job has completed.

completed_parallel_inputs

Number of completed units of work.

parallel_inputs

Number of units of work.

completed_parallel_inputs_percent

completed_parallel_inputs / parallel_inputs. Google indicates that it is possible for a stage to complete without completing all units of work.

wait_ms_avg

Job stage wait_ms_avg.

wait_ms_max

Job stage wait_ms_max.

read_ms_avg

Job stage read_ms_avg.

read_ms_max

Job stage read_ms_max.

compute_ms_avg

Job stage compute_ms_avg.

compute_ms_max

Job stage compute_ms_max.

compute_skew

1.0 - compute_ms_avg / compute_ms_max.

write_ms_avg

Job stage write_ms_avg.

write_ms_max

Job stage write_ms_max.

shuffle_output_bytes

Job stage shuffle_output_bytes.

shuffle_output_bytes_spilled

Job stage shuffle_output_bytes_spilled.

records_read

Job stage records_read. null until the job is done.

records_written

Job stage records_written. null until the job is done.

input_stages

Comma-delimited list of input stages.

read_from

Newline-delimited list of input tables and stages.

steps

Newline-delimited list of steps and substeps.