Skip to content

stat

QueueStatus

Bases: BaseModel

QueueStatus class based on Pydantic BaseModel.

Attributes:

Name Type Description
user str

User name.

queue str

Queue name.

watch bool

Whether to watch the queue.

slurm bool

Check slurm queues

sge bool

Check sge queues

sgejoblist Dict[str, List[JobInfo]]

List of jobs. Keys are 'running' and 'pending'.

table Table

rich.Table with job information.

Source code in hpcman/queue/stat.py
class QueueStatus(BaseModel):
    """QueueStatus class based on Pydantic BaseModel.

    Attributes:
        user (str): User name.
        queue (str): Queue name.
        watch (bool): Whether to watch the queue.
        slurm (bool): Check slurm queues
        sge (bool): Check sge queues
        sgejoblist (Dict[str, List[JobInfo]]): List of jobs. Keys are 'running' and 'pending'.
        table (Table): rich.Table with job information.
    """

    class Config:
        arbitrary_types_allowed = True

    user: constr(regex=r"^[^\s]+$")  # type: ignore
    queue: Optional[constr(regex=r"^[^\s]+$")]  # type: ignore
    watch: bool
    debug: bool
    sge: bool
    slurm: bool
    # retry: Optional[int]
    # maxretries: Optional[int]
    sgejoblist: Optional[Dict[str, List[JobListT]]]
    slurmjoblist: Optional[Dict[str, List[Any]]]
    table: Optional[Table]
    queuefilt: Optional[List[str]]

    def get_sge_job_list(self) -> None:
        """Get the list of job info from the qstat output.

        Attributes:
            sgejoblist: self.sgejoblist is a dictionary of job info and is set in this method.
        """

        jobinfo = run_qstat(["qstat", "-xml", "-u", self.user], self.queue, self.debug)
        # rprint(jobinfo)
        sgejoblist: Dict[str, List[JobListT]] = {}
        sgejoblist["running"] = [y for x in jobinfo.queue_info for y in x.job_list]
        sgejoblist["pending"] = [y for x in jobinfo.job_info for y in x.job_list]

        if sgejoblist["pending"] or sgejoblist["running"]:
            self.sgejoblist = sgejoblist
        else:
            raise NoJobsFound(f"No jobs found for user '{self.user}' in SGE queue '{self.queue}'.")

    def get_slurm_job_list(self) -> None:
        """Get the list of job info from the squeue output.

        Attributes:
            slurmjoblist: self.slurmjoblist is a dictionary of job info and is set in this method.
        """
        jobinfo = run_squeue(["squeue", "--json", "-u", self.user], self.queue, self.debug)
        # rprint(jobinfo)
        slurmjoblist: Dict[str, List[Any]] = {}
        slurmjoblist["running"] = [x for x in jobinfo if x["job_state"] == "RUNNING"]
        slurmjoblist["pending"] = [x for x in jobinfo if x["job_state"] == "PENDING"]

        if slurmjoblist["pending"] or slurmjoblist["running"]:
            self.slurmjoblist = slurmjoblist
        else:
            raise NoJobsFound(f"No jobs found for user '{self.user}' in SLURM queue '{self.queue}'.")

    def get_sge_job_rows(self, types: Iterable[str]) -> List[Dict[str, str]]:
        """Generates rows of SGE jobs for table output printing.

        Attributes:
            sgejoblist: A sgejoblist returned by get_sge_job_list.

        Returns:
            jobs: list of job outputs
        """
        rows: List[Dict[str, str]] = []
        keys = {
            "jb_job_number": "runid",
            "jb_name": "runname",
            "jb_owner": "user",
            "state": "code",
            "jb_start_time": "time",
            "jat_start_time": "time",
            "jb_submission_time": "time",
            "queue_name": "queue@host",
            "slots": "cpus",
            "tasks": "task",
        }
        if self.sgejoblist is None:
            return rows
        for jobtype in types:
            for job in self.sgejoblist[jobtype]:
                if jobtype == "running":
                    data = {"qtype": "SGE", "state": f"[green]{jobtype}[/green]"}
                else:
                    data = {"qtype": "SGE", "state": f"[yellow]{jobtype}[/yellow]"}
                for k, v in keys.items():
                    if v == "time":
                        if getattr(job, k, None) is None:
                            continue
                    if v == "queue@host" and jobtype == "running":
                        queue_split = getattr(job, k, "").split("@", 1)
                        queue_split[1] = queue_split[1].split(".", 1)[0]
                        data[v] = f"[magenta]{queue_split[0]}[/magenta]@[cyan]{queue_split[1]}[/cyan]"
                    elif v == "task":
                        value = getattr(job, k)
                        if value is None:
                            data[v] = ""
                            data["jobtype"] = "[blue]batch[/blue]"
                        else:
                            data[v] = str(value)
                            data["jobtype"] = "[yellow]array[/yellow]"
                    elif v == "user" and job.jb_owner == environ["USER"]:
                        data[v] = f"[green bold]{job.jb_owner}[/green bold]"
                    elif v == "code":
                        code = str(job.state)
                        if "d" in code or "t" in code:
                            data[v] = f"[dark_orange]{code}[/dark_orange]"
                        elif "E" in code or "a" in code:
                            data[v] = f"[red]{code}[/red]"
                        elif "qw" in code:
                            data[v] = f"[yellow]{code}[/yellow]"
                        else:
                            data[v] = f"[green]{code}[/green]"
                    else:
                        data[v] = str(getattr(job, k))
                rows.append(data)

        return rows

    def get_slurm_job_rows(self, types: Iterable[str]) -> List[Dict[str, str]]:
        """Generates rows of SGE jobs for table output printing.

        Attributes:
            sgejoblist: A sgejoblist returned by get_sge_job_list.

        Returns:
            jobs: list of job outputs
        """
        rows: List[Dict[str, str]] = []
        keys = {
            "job_id": "runid",
            "name": "runname",
            "user_name": "user",
            "submit_time": "time",
            "start_time": "time",
            "partition": "queue@host",
            "cpus": "cpus",
            "array_task_id": "task",
        }
        if self.slurmjoblist is None:
            return rows
        for jobtype in types:
            for job in self.slurmjoblist[jobtype]:
                if jobtype == "running":
                    data = {"qtype": "SLURM", "state": f"[green]{jobtype}[/green]", "code": "[green]R[/green]"}
                else:
                    data = {"qtype": "SLURM", "state": f"[yellow]{jobtype}[/yellow]", "code": "[yellow]PD[/yellow]"}
                for k, v in keys.items():
                    if v == "time":
                        if job.get(k) is None:
                            continue
                        else:
                            data[v] = datetime.fromtimestamp(job.get(k)).isoformat()
                    elif v == "queue@host" and jobtype == "running":
                        data[v] = f"[magenta]{job.get(k)}[/magenta]@[cyan]{job.get('nodes')}[/cyan]"
                    elif v == "cpus":
                        data[v] = str(job[k]["number"])
                    elif v == "task":
                        value = job.get(k)
                        if value["set"]:
                            data[v] = str(value["number"])
                            data["jobtype"] = "[yellow]array[/yellow]"
                        else:
                            data[v] = ""
                            data["jobtype"] = "[blue]batch[/blue]"
                    elif v == "user" and job.get(k) == environ["USER"]:
                        data[v] = f"[green bold]{job.get(k)}[/green bold]"
                    else:
                        data[v] = str(job.get(k))
                rows.append(data)

        return rows

    def get_job_status_table(self) -> None:
        """Gets job status table from qstat -xml output.

        Sets the table attribute.

        Attributes:
            sgejoblist: A sgejoblist returned by get_sge_job_list.
            table: self.table is set in this method.
        """
        header = ("qtype", "state", "runid", "runname", "user", "code", "time", "queue@host", "cpus", "task", "jobtype")
        types = ("running", "pending")
        rows = []
        rows.extend(self.get_sge_job_rows(types))
        rows.extend(self.get_slurm_job_rows(types))

        borders = box.HEAVY_HEAD
        if not sys.stdout.isatty() and not self.watch:
            borders = None
        title = "Job Status" if not self.watch else "Job Status [dim](ctrl+c to stop)[/dim]"
        table = Table(*header, title=title, box=borders)
        for row in rows:
            table.add_row(*[row[x] for x in header])
        self.table = table

    def generate_table_view(self) -> None:
        """Generates a table view of the qstat output in one function."""
        errors: List[Exception] = []
        if self.sge:
            try:
                self.get_sge_job_list()
            except NoJobsFound as e:
                errors.append(e)
        if self.slurm:
            try:
                self.get_slurm_job_list()
            except NoJobsFound as e:
                errors.append(e)

        if len(errors) == sum((self.sge, self.slurm)):
            rprint(errors)
            exit(0)
        self.get_job_status_table()

    def print_table(self, refresh: int = 2) -> None:
        """Prints a rich.Table view of the qstat output.

        Args:
            refresh: Refresh rate for the rich.Table. Defaults to 2.
        """
        if not self.watch:
            console = Console()
            self.generate_table_view()
            console.print(self.table)
        else:
            try:
                self.generate_table_view()
                with Live(
                    self.table,
                    screen=True,
                    auto_refresh=False,
                    vertical_overflow="visible",
                ) as live:
                    while True:
                        time.sleep(refresh)
                        self.generate_table_view()
                        live.update(self.table, refresh=True)  # type: ignore
            except KeyboardInterrupt:
                exit(0)

generate_table_view()

Generates a table view of the qstat output in one function.

Source code in hpcman/queue/stat.py
def generate_table_view(self) -> None:
    """Generates a table view of the qstat output in one function."""
    errors: List[Exception] = []
    if self.sge:
        try:
            self.get_sge_job_list()
        except NoJobsFound as e:
            errors.append(e)
    if self.slurm:
        try:
            self.get_slurm_job_list()
        except NoJobsFound as e:
            errors.append(e)

    if len(errors) == sum((self.sge, self.slurm)):
        rprint(errors)
        exit(0)
    self.get_job_status_table()

get_job_status_table()

Gets job status table from qstat -xml output.

Sets the table attribute.

Attributes:

Name Type Description
sgejoblist

A sgejoblist returned by get_sge_job_list.

table

self.table is set in this method.

Source code in hpcman/queue/stat.py
def get_job_status_table(self) -> None:
    """Gets job status table from qstat -xml output.

    Sets the table attribute.

    Attributes:
        sgejoblist: A sgejoblist returned by get_sge_job_list.
        table: self.table is set in this method.
    """
    header = ("qtype", "state", "runid", "runname", "user", "code", "time", "queue@host", "cpus", "task", "jobtype")
    types = ("running", "pending")
    rows = []
    rows.extend(self.get_sge_job_rows(types))
    rows.extend(self.get_slurm_job_rows(types))

    borders = box.HEAVY_HEAD
    if not sys.stdout.isatty() and not self.watch:
        borders = None
    title = "Job Status" if not self.watch else "Job Status [dim](ctrl+c to stop)[/dim]"
    table = Table(*header, title=title, box=borders)
    for row in rows:
        table.add_row(*[row[x] for x in header])
    self.table = table

get_sge_job_list()

Get the list of job info from the qstat output.

Attributes:

Name Type Description
sgejoblist

self.sgejoblist is a dictionary of job info and is set in this method.

Source code in hpcman/queue/stat.py
def get_sge_job_list(self) -> None:
    """Get the list of job info from the qstat output.

    Attributes:
        sgejoblist: self.sgejoblist is a dictionary of job info and is set in this method.
    """

    jobinfo = run_qstat(["qstat", "-xml", "-u", self.user], self.queue, self.debug)
    # rprint(jobinfo)
    sgejoblist: Dict[str, List[JobListT]] = {}
    sgejoblist["running"] = [y for x in jobinfo.queue_info for y in x.job_list]
    sgejoblist["pending"] = [y for x in jobinfo.job_info for y in x.job_list]

    if sgejoblist["pending"] or sgejoblist["running"]:
        self.sgejoblist = sgejoblist
    else:
        raise NoJobsFound(f"No jobs found for user '{self.user}' in SGE queue '{self.queue}'.")

get_sge_job_rows(types)

Generates rows of SGE jobs for table output printing.

Attributes:

Name Type Description
sgejoblist

A sgejoblist returned by get_sge_job_list.

Returns:

Name Type Description
jobs List[Dict[str, str]]

list of job outputs

Source code in hpcman/queue/stat.py
def get_sge_job_rows(self, types: Iterable[str]) -> List[Dict[str, str]]:
    """Generates rows of SGE jobs for table output printing.

    Attributes:
        sgejoblist: A sgejoblist returned by get_sge_job_list.

    Returns:
        jobs: list of job outputs
    """
    rows: List[Dict[str, str]] = []
    keys = {
        "jb_job_number": "runid",
        "jb_name": "runname",
        "jb_owner": "user",
        "state": "code",
        "jb_start_time": "time",
        "jat_start_time": "time",
        "jb_submission_time": "time",
        "queue_name": "queue@host",
        "slots": "cpus",
        "tasks": "task",
    }
    if self.sgejoblist is None:
        return rows
    for jobtype in types:
        for job in self.sgejoblist[jobtype]:
            if jobtype == "running":
                data = {"qtype": "SGE", "state": f"[green]{jobtype}[/green]"}
            else:
                data = {"qtype": "SGE", "state": f"[yellow]{jobtype}[/yellow]"}
            for k, v in keys.items():
                if v == "time":
                    if getattr(job, k, None) is None:
                        continue
                if v == "queue@host" and jobtype == "running":
                    queue_split = getattr(job, k, "").split("@", 1)
                    queue_split[1] = queue_split[1].split(".", 1)[0]
                    data[v] = f"[magenta]{queue_split[0]}[/magenta]@[cyan]{queue_split[1]}[/cyan]"
                elif v == "task":
                    value = getattr(job, k)
                    if value is None:
                        data[v] = ""
                        data["jobtype"] = "[blue]batch[/blue]"
                    else:
                        data[v] = str(value)
                        data["jobtype"] = "[yellow]array[/yellow]"
                elif v == "user" and job.jb_owner == environ["USER"]:
                    data[v] = f"[green bold]{job.jb_owner}[/green bold]"
                elif v == "code":
                    code = str(job.state)
                    if "d" in code or "t" in code:
                        data[v] = f"[dark_orange]{code}[/dark_orange]"
                    elif "E" in code or "a" in code:
                        data[v] = f"[red]{code}[/red]"
                    elif "qw" in code:
                        data[v] = f"[yellow]{code}[/yellow]"
                    else:
                        data[v] = f"[green]{code}[/green]"
                else:
                    data[v] = str(getattr(job, k))
            rows.append(data)

    return rows

get_slurm_job_list()

Get the list of job info from the squeue output.

Attributes:

Name Type Description
slurmjoblist

self.slurmjoblist is a dictionary of job info and is set in this method.

Source code in hpcman/queue/stat.py
def get_slurm_job_list(self) -> None:
    """Get the list of job info from the squeue output.

    Attributes:
        slurmjoblist: self.slurmjoblist is a dictionary of job info and is set in this method.
    """
    jobinfo = run_squeue(["squeue", "--json", "-u", self.user], self.queue, self.debug)
    # rprint(jobinfo)
    slurmjoblist: Dict[str, List[Any]] = {}
    slurmjoblist["running"] = [x for x in jobinfo if x["job_state"] == "RUNNING"]
    slurmjoblist["pending"] = [x for x in jobinfo if x["job_state"] == "PENDING"]

    if slurmjoblist["pending"] or slurmjoblist["running"]:
        self.slurmjoblist = slurmjoblist
    else:
        raise NoJobsFound(f"No jobs found for user '{self.user}' in SLURM queue '{self.queue}'.")

get_slurm_job_rows(types)

Generates rows of SGE jobs for table output printing.

Attributes:

Name Type Description
sgejoblist

A sgejoblist returned by get_sge_job_list.

Returns:

Name Type Description
jobs List[Dict[str, str]]

list of job outputs

Source code in hpcman/queue/stat.py
def get_slurm_job_rows(self, types: Iterable[str]) -> List[Dict[str, str]]:
    """Generates rows of SGE jobs for table output printing.

    Attributes:
        sgejoblist: A sgejoblist returned by get_sge_job_list.

    Returns:
        jobs: list of job outputs
    """
    rows: List[Dict[str, str]] = []
    keys = {
        "job_id": "runid",
        "name": "runname",
        "user_name": "user",
        "submit_time": "time",
        "start_time": "time",
        "partition": "queue@host",
        "cpus": "cpus",
        "array_task_id": "task",
    }
    if self.slurmjoblist is None:
        return rows
    for jobtype in types:
        for job in self.slurmjoblist[jobtype]:
            if jobtype == "running":
                data = {"qtype": "SLURM", "state": f"[green]{jobtype}[/green]", "code": "[green]R[/green]"}
            else:
                data = {"qtype": "SLURM", "state": f"[yellow]{jobtype}[/yellow]", "code": "[yellow]PD[/yellow]"}
            for k, v in keys.items():
                if v == "time":
                    if job.get(k) is None:
                        continue
                    else:
                        data[v] = datetime.fromtimestamp(job.get(k)).isoformat()
                elif v == "queue@host" and jobtype == "running":
                    data[v] = f"[magenta]{job.get(k)}[/magenta]@[cyan]{job.get('nodes')}[/cyan]"
                elif v == "cpus":
                    data[v] = str(job[k]["number"])
                elif v == "task":
                    value = job.get(k)
                    if value["set"]:
                        data[v] = str(value["number"])
                        data["jobtype"] = "[yellow]array[/yellow]"
                    else:
                        data[v] = ""
                        data["jobtype"] = "[blue]batch[/blue]"
                elif v == "user" and job.get(k) == environ["USER"]:
                    data[v] = f"[green bold]{job.get(k)}[/green bold]"
                else:
                    data[v] = str(job.get(k))
            rows.append(data)

    return rows

print_table(refresh=2)

Prints a rich.Table view of the qstat output.

Parameters:

Name Type Description Default
refresh int

Refresh rate for the rich.Table. Defaults to 2.

2
Source code in hpcman/queue/stat.py
def print_table(self, refresh: int = 2) -> None:
    """Prints a rich.Table view of the qstat output.

    Args:
        refresh: Refresh rate for the rich.Table. Defaults to 2.
    """
    if not self.watch:
        console = Console()
        self.generate_table_view()
        console.print(self.table)
    else:
        try:
            self.generate_table_view()
            with Live(
                self.table,
                screen=True,
                auto_refresh=False,
                vertical_overflow="visible",
            ) as live:
                while True:
                    time.sleep(refresh)
                    self.generate_table_view()
                    live.update(self.table, refresh=True)  # type: ignore
        except KeyboardInterrupt:
            exit(0)

get_queue_list(user=environ['USER'])

Gets list of available queue names and returns the list

Source code in hpcman/queue/stat.py
def get_queue_list(user: str = environ["USER"]) -> List[str]:
    """Gets list of available queue names and returns the list"""
    queue_list: List[str]
    jobinfo = run_qstat(["qstat", "-f", "-xml", "-U", user])
    queue_list = [y.name for x in jobinfo.queue_info for y in x.queue_list if y.name is not None]
    return sorted({x.split("@", 1)[0] for x in queue_list})

run_qstat(cmd, queue=None, debug=False)

Runs qstat and returns a JobInfo result of parsed XML data

Source code in hpcman/queue/stat.py
def run_qstat(cmd: List[str], queue: Optional[str] = None, debug: bool = False) -> JobInfo:
    """Runs qstat and returns a JobInfo result of parsed XML data"""
    config = ParserConfig(fail_on_unknown_properties=True)
    if debug:
        xmlfile = Path("./qstat.xml")
        # rprint(xmlfile.read_text())
        jobinfo = XmlParser(config=config).from_path(xmlfile, JobInfo)
    else:
        if queue is not None:
            cmd.extend(["-q", queue])
        qstat_res = subprocess.run(cmd, capture_output=True, check=True, text=True)
        jobinfo = XmlParser().from_string(qstat_res.stdout, JobInfo)
    return jobinfo

run_queue_status(user, queue, debug, watch, slurm, sge, queuefilt=None)

Generates a QueueStatus object and prints a rich.Table view.

Parameters:

Name Type Description Default
user str

Username to search the queue status for.

required
queue Optional[str]

Queue to limit the search to.

required
debug bool

Useful for development.

required
watch bool

Refreshes the queue status every 2 seconds.

required
queuefilt Optional[List[str]]

Unused currently. Could be used to restrict the output. Defaults to None.

None
Source code in hpcman/queue/stat.py
def run_queue_status(
    user: str,
    queue: Optional[str],
    debug: bool,
    watch: bool,
    slurm: bool,
    sge: bool,
    queuefilt: Optional[List[str]] = None,
) -> None:
    """Generates a QueueStatus object and prints a rich.Table view.

    Args:
        user: Username to search the queue status for.
        queue: Queue to limit the search to.
        debug: Useful for development.
        watch: Refreshes the queue status every 2 seconds.
        queuefilt: Unused currently. Could be used to restrict the output. Defaults to None.
    """
    queuestat = QueueStatus(
        user=user,
        queue=queue,
        watch=watch,
        queuefilt=queuefilt,
        sgejoblist=None,
        slurmjoblist=None,
        slurm=slurm,
        sge=sge,
        table=None,
        debug=debug,
    )
    queuestat.print_table()

run_squeue(cmd, queue=None, debug=False)

Runs qstat and returns a list of job dicts

Source code in hpcman/queue/stat.py
def run_squeue(cmd: List[str], queue: Optional[str] = None, debug: bool = False) -> List[Dict[str, Any]]:
    """Runs qstat and returns a list of job dicts"""
    if debug:
        jsonfile = Path("./squeue.json")
        # rprint(xmlfile.read_text())
        with jsonfile.open("rt") as jsonfh:
            jobinfo = json.load(jsonfh)["jobs"]
    else:
        if queue is not None:
            cmd.extend(["-p", queue])
        squeue_res = subprocess.run(cmd, capture_output=True, check=True, text=True)
        jobinfo = json.loads(squeue_res.stdout)["jobs"]
    return jobinfo