Skip to content

avail

QueueAvail

Bases: BaseModel

QueueAvail class based on Pydantic BaseModel.

Attributes:

Name Type Description
user str

User name.

queue str

Queue name.

table Table

rich.Table with job information.

slurm bool

Check slurm queues

sge bool

Check sge queues

gpu bool

Check gpu slurm queues

hide bool

Hide full queues

Source code in hpcman/queue/avail.py
class QueueAvail(BaseModel):
    """QueueAvail class based on Pydantic BaseModel.

    Attributes:
        user (str): User name.
        queue (str): Queue name.
        table (Table): rich.Table with job information.
        slurm (bool): Check slurm queues
        sge (bool): Check sge queues
        gpu (bool): Check gpu slurm queues
        hide (bool): Hide full queues
    """

    class Config:
        arbitrary_types_allowed = True

    user: constr(regex=r"^[^\s]+$")  # type: ignore
    queue: Optional[constr(regex=r"^[^\s]+$")]  # type: ignore
    debug: bool
    # retry: Optional[int]
    # maxretries: Optional[int]
    table: Optional[Table]
    sge: bool
    slurm: bool
    gpu: bool
    hide: bool
    sgequeueinfo: Optional[List[QueueListT]]
    slurmqueueinfo: Optional[List[Dict[str, Any]]]

    def get_sge_queue_list(self) -> None:
        try:
            self.sgequeueinfo = run_qstat_avail(self.user, self.queue, self.debug)
        except OSError as e:
            raise NoQueuesFound("Unable to run qstat") from e

    def get_slurm_queue_list(self) -> None:
        try:
            self.slurmqueueinfo = run_sinfo_avail(self.queue, self.debug)
        except OSError as e:
            raise NoQueuesFound("Unable to run sinfo") from e

    def get_sge_queue_rows(self) -> List[Dict[str, str]]:
        """Generates rows of SGE queues for table output printing.

        Returns:
            jobs: list of job outputs
        """

        rows: List[Dict[str, str]] = []
        keys = {
            "load_avg": "load_avg",
            "slots_total": "q_slots",
            "slots_used": "q_slots_free",
            "state": "q_status",
            "qtype": "q_feat",
            "arch": "arch",
        }
        res_keys = {
            "hostname": "host",
            "mem_total": "ram_total",
            "mem_free": "ram_free",
            "num_proc": "slots_total",
            "qname": "queue/partition",
        }

        if self.sgequeueinfo is None:
            return rows

        for queue in self.sgequeueinfo:
            data = {"qtype": "SGE"}
            for k, v in keys.items():
                if v == "q_status":
                    state = getattr(queue, k)
                    if state is None:
                        data[v] = "normal"
                    else:
                        data[v] = str(state)
                elif v == "q_slots_free":
                    data[v] = str(int(data["q_slots"]) - getattr(queue, k, 0))
                else:
                    data[v] = str(getattr(queue, k))
            for res in queue.resource:
                if res.name in res_keys:
                    if res.name == "hostname":
                        data[res_keys[res.name]] = str(res.value.split(".", 1)[0])
                    else:
                        data[res_keys[res.name]] = str(res.value)
            ram = Ram(total=data.get("ram_total", "0"), free=data.get("ram_free", "0"))
            data["ram_total"] = ram.total.human_readable(decimal=True, gnu=True)
            data["ram_free"] = ram.free.human_readable(decimal=True, gnu=True)
            rows.append(data)

        return rows

    def get_slurm_queue_rows(self) -> List[Dict[str, str]]:
        """Generates rows of slurm queues for table output printing.

        Returns:
            rows: list of queue info
        """
        rows: List[Dict[str, str]] = []

        keys = [
            "host",
            "load_avg",
            "ram_total",
            "ram_free",
            "slots_total",
            "queue/partition",
            "q_slots",
            "q_slots_free",
            "q_status",
            "q_feat",
            "gpu_total",
            "gpu_used",
        ]
        # rprint(sgequeueinfo)

        if self.slurmqueueinfo is None:
            return rows

        for queue in self.slurmqueueinfo:
            data = {"qtype": "SLURM"}
            ram = Ram(total=f'{queue["memory"]["maximum"]}MB', free=f'{queue["memory"]["free"]["maximum"]["number"]}MB')
            for v in keys:
                if v == "host":
                    data[v] = queue["nodes"]["nodes"][0]
                elif v == "ram_total":
                    if getattr(ram, "total", None) is not None:
                        data[v] = ram.total.human_readable(decimal=True, gnu=True)
                elif v == "ram_free":
                    if getattr(ram, "free", None) is not None:
                        data[v] = ram.free.human_readable(decimal=True, gnu=True)
                elif v == "slots_total":
                    data[v] = str(queue["cpus"]["total"])
                elif v == "queue/partition":
                    data[v] = str(queue["partition"]["name"])
                elif v == "load_avg":
                    data[v] = str(queue["cpus"]["load"]["maximum"] / 100)
                elif v == "q_slots":
                    data[v] = str(queue["cpus"]["total"])
                elif v == "q_slots_free":
                    data[v] = str(queue["cpus"]["idle"])
                elif v == "q_status":
                    data[v] = str(queue["node"]["state"][0])
                elif v == "q_feat":
                    data[v] = ""
                elif v == "gpu_total":
                    gpus = [x.split(":", 1)[1] for x in queue["gres"]["total"].split(",") if x.startswith("gpu")]
                    data[v] = "\n".join(gpus)
                elif v == "gpu_used":
                    gpus = [x.split(":", 1)[1] for x in queue["gres"]["used"].split(",") if x.startswith("gpu")]
                    data[v] = "\n".join(gpus)
            rows.append(data)

        return rows

    def get_queue_status_table(self) -> None:
        """Gets queue status table from qstat -xml output.

        Sets the table attribute.

        Attributes:
            table: self.table is set in this method.
        """
        header = [
            "qtype",
            "host",
            "load_avg",
            "ram_total",
            "ram_free",
            "slots_total",
            "queue/partition",
            "q_slots",
            "q_slots_free",
            "q_status",
            "q_feat",
            "gpu_total",
            "gpu_used",
        ]
        rows = []
        rows.extend(self.get_sge_queue_rows())
        rows.extend(self.get_slurm_queue_rows())

        if self.gpu:
            rows = [row for row in rows if row.get("gpu_total") or row.get("gpu_used")]

        borders = box.HEAVY_HEAD
        if not sys.stdout.isatty():
            borders = None
        title = "Queue Info"
        table = Table(*header, title=title, box=borders, expand=True)
        if self.hide:
            rows = [row for row in rows if row.get("q_slots_free") != "0"]
        for row in natsorted(
            rows, key=lambda row: (row.get("qtype"), row.get("q_slots_free"), row.get("ram_free", "0")), reverse=True
        ):
            table.add_row(*[row.get(x, "") for x in header])
        self.table = table

    def generate_table_view(self) -> None:
        """Generates a table view of the qstat output in one function."""
        errors: List[Exception] = []
        if self.sge:
            try:
                self.get_sge_queue_list()
            except NoQueuesFound as e:
                errors.append(e)
        if self.slurm:
            try:
                self.get_slurm_queue_list()
            except NoQueuesFound as e:
                errors.append(e)

        if len(errors) == sum((self.sge, self.slurm)):
            rprint(errors)
            exit(0)
        self.get_queue_status_table()

    def print_table(self) -> None:
        """Prints a rich.Table view of the qstat output.

        Args:
            None
        """
        console = Console()
        self.generate_table_view()
        console.print(self.table)

generate_table_view()

Generates a table view of the qstat output in one function.

Source code in hpcman/queue/avail.py
def generate_table_view(self) -> None:
    """Generates a table view of the qstat output in one function."""
    errors: List[Exception] = []
    if self.sge:
        try:
            self.get_sge_queue_list()
        except NoQueuesFound as e:
            errors.append(e)
    if self.slurm:
        try:
            self.get_slurm_queue_list()
        except NoQueuesFound as e:
            errors.append(e)

    if len(errors) == sum((self.sge, self.slurm)):
        rprint(errors)
        exit(0)
    self.get_queue_status_table()

get_queue_status_table()

Gets queue status table from qstat -xml output.

Sets the table attribute.

Attributes:

Name Type Description
table

self.table is set in this method.

Source code in hpcman/queue/avail.py
def get_queue_status_table(self) -> None:
    """Gets queue status table from qstat -xml output.

    Sets the table attribute.

    Attributes:
        table: self.table is set in this method.
    """
    header = [
        "qtype",
        "host",
        "load_avg",
        "ram_total",
        "ram_free",
        "slots_total",
        "queue/partition",
        "q_slots",
        "q_slots_free",
        "q_status",
        "q_feat",
        "gpu_total",
        "gpu_used",
    ]
    rows = []
    rows.extend(self.get_sge_queue_rows())
    rows.extend(self.get_slurm_queue_rows())

    if self.gpu:
        rows = [row for row in rows if row.get("gpu_total") or row.get("gpu_used")]

    borders = box.HEAVY_HEAD
    if not sys.stdout.isatty():
        borders = None
    title = "Queue Info"
    table = Table(*header, title=title, box=borders, expand=True)
    if self.hide:
        rows = [row for row in rows if row.get("q_slots_free") != "0"]
    for row in natsorted(
        rows, key=lambda row: (row.get("qtype"), row.get("q_slots_free"), row.get("ram_free", "0")), reverse=True
    ):
        table.add_row(*[row.get(x, "") for x in header])
    self.table = table

get_sge_queue_rows()

Generates rows of SGE queues for table output printing.

Returns:

Name Type Description
jobs List[Dict[str, str]]

list of job outputs

Source code in hpcman/queue/avail.py
def get_sge_queue_rows(self) -> List[Dict[str, str]]:
    """Generates rows of SGE queues for table output printing.

    Returns:
        jobs: list of job outputs
    """

    rows: List[Dict[str, str]] = []
    keys = {
        "load_avg": "load_avg",
        "slots_total": "q_slots",
        "slots_used": "q_slots_free",
        "state": "q_status",
        "qtype": "q_feat",
        "arch": "arch",
    }
    res_keys = {
        "hostname": "host",
        "mem_total": "ram_total",
        "mem_free": "ram_free",
        "num_proc": "slots_total",
        "qname": "queue/partition",
    }

    if self.sgequeueinfo is None:
        return rows

    for queue in self.sgequeueinfo:
        data = {"qtype": "SGE"}
        for k, v in keys.items():
            if v == "q_status":
                state = getattr(queue, k)
                if state is None:
                    data[v] = "normal"
                else:
                    data[v] = str(state)
            elif v == "q_slots_free":
                data[v] = str(int(data["q_slots"]) - getattr(queue, k, 0))
            else:
                data[v] = str(getattr(queue, k))
        for res in queue.resource:
            if res.name in res_keys:
                if res.name == "hostname":
                    data[res_keys[res.name]] = str(res.value.split(".", 1)[0])
                else:
                    data[res_keys[res.name]] = str(res.value)
        ram = Ram(total=data.get("ram_total", "0"), free=data.get("ram_free", "0"))
        data["ram_total"] = ram.total.human_readable(decimal=True, gnu=True)
        data["ram_free"] = ram.free.human_readable(decimal=True, gnu=True)
        rows.append(data)

    return rows

get_slurm_queue_rows()

Generates rows of slurm queues for table output printing.

Returns:

Name Type Description
rows List[Dict[str, str]]

list of queue info

Source code in hpcman/queue/avail.py
def get_slurm_queue_rows(self) -> List[Dict[str, str]]:
    """Generates rows of slurm queues for table output printing.

    Returns:
        rows: list of queue info
    """
    rows: List[Dict[str, str]] = []

    keys = [
        "host",
        "load_avg",
        "ram_total",
        "ram_free",
        "slots_total",
        "queue/partition",
        "q_slots",
        "q_slots_free",
        "q_status",
        "q_feat",
        "gpu_total",
        "gpu_used",
    ]
    # rprint(sgequeueinfo)

    if self.slurmqueueinfo is None:
        return rows

    for queue in self.slurmqueueinfo:
        data = {"qtype": "SLURM"}
        ram = Ram(total=f'{queue["memory"]["maximum"]}MB', free=f'{queue["memory"]["free"]["maximum"]["number"]}MB')
        for v in keys:
            if v == "host":
                data[v] = queue["nodes"]["nodes"][0]
            elif v == "ram_total":
                if getattr(ram, "total", None) is not None:
                    data[v] = ram.total.human_readable(decimal=True, gnu=True)
            elif v == "ram_free":
                if getattr(ram, "free", None) is not None:
                    data[v] = ram.free.human_readable(decimal=True, gnu=True)
            elif v == "slots_total":
                data[v] = str(queue["cpus"]["total"])
            elif v == "queue/partition":
                data[v] = str(queue["partition"]["name"])
            elif v == "load_avg":
                data[v] = str(queue["cpus"]["load"]["maximum"] / 100)
            elif v == "q_slots":
                data[v] = str(queue["cpus"]["total"])
            elif v == "q_slots_free":
                data[v] = str(queue["cpus"]["idle"])
            elif v == "q_status":
                data[v] = str(queue["node"]["state"][0])
            elif v == "q_feat":
                data[v] = ""
            elif v == "gpu_total":
                gpus = [x.split(":", 1)[1] for x in queue["gres"]["total"].split(",") if x.startswith("gpu")]
                data[v] = "\n".join(gpus)
            elif v == "gpu_used":
                gpus = [x.split(":", 1)[1] for x in queue["gres"]["used"].split(",") if x.startswith("gpu")]
                data[v] = "\n".join(gpus)
        rows.append(data)

    return rows

print_table()

Prints a rich.Table view of the qstat output.

Source code in hpcman/queue/avail.py
def print_table(self) -> None:
    """Prints a rich.Table view of the qstat output.

    Args:
        None
    """
    console = Console()
    self.generate_table_view()
    console.print(self.table)

Ram

Bases: BaseModel

Ram class to enable pretty printing of MB/GB/TB/etc

Attributes:

Name Type Description
total ByteSizeGnu

Total RAM.

free ByteSizeGnu

Free RAM.

Source code in hpcman/queue/avail.py
class Ram(BaseModel):
    """Ram class to enable pretty printing of MB/GB/TB/etc

    Attributes:
        total (ByteSizeGnu): Total RAM.
        free (ByteSizeGnu): Free RAM.
    """

    total: ByteSizeGnu
    free: ByteSizeGnu

run_qstat_avail(user, queue=None, debug=False)

Runs qstat and returns a JobInfo result of parsed XML data

Source code in hpcman/queue/avail.py
def run_qstat_avail(user: str, queue: Optional[str] = None, debug: bool = False) -> List[QueueListT]:
    """Runs qstat and returns a JobInfo result of parsed XML data"""
    cmd = ["qstat", "-F", "mem_free,mem_total,mem_used,h,p,q", "-xml", "-f", "-U", user]
    config = ParserConfig(fail_on_unknown_properties=True)
    if debug:
        xmlfile = Path("./qavail.xml")
        # rprint(xmlfile.read_text())
        jobinfo = XmlParser(config=config).from_path(xmlfile, JobInfo)
    else:
        if queue is not None:
            cmd.extend(["-q", queue])
        qstat_res = subprocess.run(cmd, capture_output=True, check=True, text=True)
        jobinfo = XmlParser().from_string(qstat_res.stdout, JobInfo)
    return jobinfo.queue_info[0].queue_list

run_queue_avail(user, queue, debug, slurm, sge, gpu, hide, queuefilt=None)

Generates a QueueAvail object and prints a rich.Table view.

Parameters:

Name Type Description Default
user str

Username to search the queue status for.

required
queue Optional[str]

Queue to limit the search to.

required
debug bool

Useful for development.

required
slurm bool

Show slurm queues

required
sge bool

Show sge queues

required
gpu bool

Show only slurm gpu queues

required
hide bool

Hide full queues

required
queuefilt Optional[List[str]]

Unused currently. Could be used to restrict the output. Defaults to None.

None
Source code in hpcman/queue/avail.py
def run_queue_avail(
    user: str,
    queue: Optional[str],
    debug: bool,
    slurm: bool,
    sge: bool,
    gpu: bool,
    hide: bool,
    queuefilt: Optional[List[str]] = None,
) -> None:
    """Generates a QueueAvail object and prints a rich.Table view.

    Args:
        user: Username to search the queue status for.
        queue: Queue to limit the search to.
        debug: Useful for development.
        slurm: Show slurm queues
        sge: Show sge queues
        gpu: Show only slurm gpu queues
        hide: Hide full queues
        queuefilt: Unused currently. Could be used to restrict the output. Defaults to None.
    """
    queueavail = QueueAvail(
        user=user,
        queue=queue,
        queuefilt=queuefilt,
        table=None,
        debug=debug,
        slurm=slurm,
        sge=sge,
        gpu=gpu,
        hide=hide,
    )
    queueavail.print_table()

run_sinfo_avail(queue=None, debug=False)

Runs sinfo and returns a list of queue dicts

Source code in hpcman/queue/avail.py
def run_sinfo_avail(queue: Optional[str] = None, debug: bool = False) -> List[Dict[str, Any]]:
    """Runs sinfo and returns a list of queue dicts"""
    cmd = ["sinfo", "-Nl", "--json"]
    if debug:
        jsonfile = Path("./sinfo.json")
        # rprint(xmlfile.read_text())
        with jsonfile.open("rt") as jsonfh:
            jobinfo = json.load(jsonfh)["sinfo"]
    else:
        if queue is not None:
            cmd.extend(["-p", queue])
        squeue_res = subprocess.run(cmd, capture_output=True, check=True, text=True)
        jobinfo = json.loads(squeue_res.stdout)["sinfo"]
    return jobinfo