Source code for fuzzyops.sequencing_assignment.solver

from fuzzyops.graphs.fuzzgraph import FuzzyGraph
from typing import List, Dict


[docs] class FuzzySASolver: """ Represents the Assignment Problem solver (SAS) using a fuzzy graph Attributes: _graph (FuzzyGraph): A fuzzy graph containing information about employees and tasks _workers (List[str]): List of employees _tasks (List[str]): Task list """ def __init__(self): self._graph = None self._workers = None self._tasks = None
[docs] def load_graph(self, graph: FuzzyGraph) -> None: """ Loads an empty graph with a certain fuzzy mathematical type Args: graph (FuzzyGraph): Fuzzy graph for loading Raises: Exception: If the graph is no longer empty or is not an instance of FuzzyGraph """ if not (type(graph) is FuzzyGraph): raise Exception('Can use only FuzzGraph') if (graph.get_nodes_amount() != 0) or (graph.get_edges_amount() != 0): raise Exception('Can load only empty graph') self._graph = graph
[docs] def load_tasks_workers(self, tasks: List[str], workers: List[str]) -> None: """ Loads lists of tasks and employees Args: tasks (List[str]): The list of tasks to download workers (List[str]): List of workers to upload Raises: Exception: If the graph is not loaded, or if tasks and workers are not lists """ if not self._graph: raise Exception('Upload empty FuzzGraph with `load_graph` function') if not (type(tasks) is list): raise Exception('`tasks` can be only a list') if not (type(workers) is list): raise Exception('`workers` can be only a list') self._workers = workers self._tasks = tasks # workers for i in range(len(self._workers)): self._graph.add_node() # tasks for i in range(len(self._tasks)): self._graph.add_node()
[docs] def load_task_worker_pair_value(self, task: str, worker: str, value: List[int]) -> None: """ Loads the cost of an assignment for a pair of employee and task Args: task (str): Assignment task worker (str): The employee to whom the task is assigned value (List[int]): The cost of the appointment Raises: Exception: If the list of tasks or workers is not loaded """ if (self._workers is None) or (self._tasks is None): raise Exception('Upload tasks and workers lists with `load_tasks_workers` function') if not (task in self._tasks): raise Exception('No such task') if not (worker in self._workers): raise Exception('No such worker') w = self._workers.index(worker) t = self._tasks.index(task) + len(self._workers) self._graph.add_edge(w, t, value)
[docs] def solve(self) -> Dict: """ The main function of the solution, which implements the Hungarian algorithm Returns: Dict: A dictionary with employee assignments to tasks and the total cost Raises: Exception: If the graph or the lists of workers/tasks are not loaded """ if self._graph is None: raise Exception('There is no graph loaded to solver, use `load_graph` function') if (self._workers is None) or (self._tasks is None): raise Exception('Upload tasks and workers lists with `load_tasks_workers` function') # Creating an array for storing assignments (employee indexes) assignment = [-1] * len(self._workers) # Creating an array to track busy tasks occupied_tasks = [-1] * len(self._tasks) # Iteratively improving assignments for worker in range(len(self._workers)): # List of available tasks to assign available_tasks = [task + len(self._workers) for task in range(len(self._tasks)) if occupied_tasks[task] == -1] # Trying to find an improvement for the current employee for task in available_tasks: try: cost = self._graph.get_edge_len(worker, task) except: continue if assignment[worker] == -1 or cost < self._graph.get_edge_len(worker, assignment[worker]): # We release the previous task, if it was if assignment[worker] != -1: occupied_tasks[assignment[worker] - len(self._workers)] = -1 # Assigning a new task assignment[worker] = task occupied_tasks[task - len(self._workers)] = worker # Calculating the total cost total_cost = None for worker in range(len(self._workers)): if assignment[worker] != -1: if total_cost is None: total_cost = self._graph.get_edge_len(worker, assignment[worker]) else: total_cost += self._graph.get_edge_len(worker, assignment[worker]) toRet = [] for i in range(len(self._workers)): if assignment[i] != -1: toRet.append([self._workers[i], self._tasks[assignment[i] - len(self._workers)]]) else: toRet.append([self._workers[i], 'no assignment']) return { 'assignments': toRet, 'cost': total_cost }