from collections import OrderedDict
import itertools
from typing import Union, Callable, List, Tuple
import torch
import torch.nn.functional as F
import numpy as np
from torch.utils.data import TensorDataset, DataLoader
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from .mf_funcs import make_gauss_mfs, GaussMemberFunc, BellMemberFunc, make_bell_mfs
dtype = torch.float
funcs = Union[GaussMemberFunc, BellMemberFunc]
task_types = {"classification": "classification", "regression": "regression"}
funcs_type = {"gauss": "gauss", "bell": "bell"}
class _FuzzyVar(torch.nn.Module):
"""
The class of the layer for fuzzification of input variables
Attributes:
mfdefs (torch.nn.ModuleDict): Dictionary of membership functions for fuzzification
padding (int): The padding value for matrix alignment after fuzzification
Args:
mfdefs (List[funcs]): A list of membership functions for the input variable
"""
def __init__(self, mfdefs: List[funcs]):
super(_FuzzyVar, self).__init__()
if isinstance(mfdefs, list):
mfnames = ['mf{}'.format(i) for i in range(len(mfdefs))]
mfdefs = OrderedDict(zip(mfnames, mfdefs))
self.mfdefs = torch.nn.ModuleDict(mfdefs)
self.padding = 0
@property
def num_mfs(self) -> int:
"""
Returns the number of terms for each input variable
Returns:
int: The number of terms
"""
return len(self.mfdefs)
def members(self) -> torch.nn.ModuleDict.items:
"""
Returns a fuzzy term with its membership function
Returns:
torch.nn.ModuleDict.items: Dictionary elements of fuzzy terms and membership functions
"""
return self.mfdefs.items()
def pad_to(self, new_size: int) -> None:
"""
The method sets the padding value to align the matrices after fuzzification
Args:
new_size (int): New value for padding
"""
self.padding = new_size - len(self.mfdefs)
def fuzzify(self, x: torch.Tensor):
"""
Method for fuzzification of transmitted values
Args:
x (torch.Tensor): Input values for fuzzification
Yields:
Tuple[str, torch.Tensor]: The name of the membership function and its values
"""
for mfname, mfdef in self.mfdefs.items():
yvals = mfdef(x)
yield mfname, yvals
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Performs fuzzification of the transmitted values and returns the results
Args:
x (torch.Tensor): Input values for fuzzification
Returns:
torch.Tensor: Fuzzification results, including padding, if necessary
"""
predictions = torch.cat([mf(x) for mf in self.mfdefs.values()], dim=1)
if self.padding > 0:
predictions = torch.cat([predictions, torch.zeros(x.shape[0], self.padding)], dim=1)
return predictions
class _FuzzyLayer(torch.nn.Module):
"""
A layer class for combining all fuzzy terms
Attributes:
varmfs (torch.nn.ModuleDict): Dictionary of fuzzy variables
varnames (List[str]): Names of input variables
Args:
varmfs (List[_FuzzyVar]): List of fuzzy variables
varnames (List[str], optional): Variable names (if omitted, x0, x1, etc. are used)
"""
def __init__(self, varmfs: List[_FuzzyVar], varnames=None):
super(_FuzzyLayer, self).__init__()
self.varnames = ['x{}'.format(i) for i in range(len(varmfs))] if not varnames else list(varnames)
maxmfs = max([var.num_mfs for var in varmfs])
for var in varmfs:
var.pad_to(maxmfs)
self.varmfs = torch.nn.ModuleDict(zip(self.varnames, varmfs))
@property
def num_in(self) -> int:
"""
A property that returns the number of input variables
Returns:
int: Number of input variables
"""
return len(self.varmfs)
@property
def max_mfs(self) -> int:
"""
A property that returns the maximum number of input terms among all variables
Returns:
int: Maximum number of input terms
"""
return max([var.num_mfs for var in self.varmfs.values()])
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
A method for concatenating fuzzy terms into a single tensor
Args:
x (torch.Tensor): Input values to be processed
Returns:
torch.Tensor: Concatenated tensor of fuzzy terms
Raises:
AssertionError: If the number of input values does not match the expected value
"""
assert x.shape[1] == self.num_in, \
'{} is wrong no. of input values'.format(self.num_in)
y_pred = torch.stack([var(x[:, i:i + 1])
for i, var in enumerate(self.varmfs.values())],
dim=1)
return y_pred
class _AntecedentLayer(torch.nn.Module):
"""
The class of the antecedent layer of fuzzy logic rules
This class is responsible for creating fuzzy rules using antecedents
(membership functions), which are determined by input fuzzy variables
It generates rules as the product of the values of the membership functions for
the corresponding input signals.
Attributes:
mf_indices (torch.Tensor): Indexes of membership functions for generated fuzzy rules
Args:
varlist (List[_FuzzyVar]): A list of fuzzy variables, each of which contains its own membership functions
"""
def __init__(self, varlist: List[_FuzzyVar]):
super(_AntecedentLayer, self).__init__()
mf_count = [var.num_mfs for var in varlist]
mf_indices = itertools.product(*[range(n) for n in mf_count])
self.mf_indices = torch.tensor(list(mf_indices))
def num_rules(self) -> int:
"""
The method returns the number of fuzzy rules
Returns:
int: Number of fuzzy rules
"""
return len(self.mf_indices)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Generates the antecedents of the corresponding rule and calculates the degrees of rule fulfillment
Each rule is determined by the product of the values of the membership functions
associated with the input signals.
Args:
x (torch.Tensor): Input values containing the results of fuzzification of variables,
expected dimensions (batch_size, num_mfs, feature_size)
Returns:
torch.Tensor: The degree of rule fulfillment for fuzzy rules,
and the dimensions (batch_size, num_rules)
"""
batch_indices = self.mf_indices.expand((x.shape[0], -1, -1)).to(x.device)
ants = torch.gather(x.transpose(1, 2), 1, batch_indices)
rules = torch.prod(ants, dim=2)
return rules
class _ConsequentLayer(torch.nn.Module):
"""
The class of the fuzzy logic sequent layer
This class is responsible for calculating the output values of a fuzzy system
based on the set rules and input data. It includes
coefficients (weights) that are used to linearly combine
inputs to produce totals
Attributes:
_coeff (torch.Tensor): Layer parameters representing weights
for a linear combination of input data
Args:
d_in (int): The dimension of the input data
d_rule (int): Number of fuzzy rules
d_out (int): The dimension of the output data
Properties:
coeff() -> torch.Tensor:
Returns coefficients (weights) layer
"""
def __init__(self, d_in: int, d_rule: int, d_out: int):
super(_ConsequentLayer, self).__init__()
c_shape = torch.Size([d_rule, d_out, d_in + 1])
self._coeff = torch.zeros(c_shape, dtype=dtype, requires_grad=True)
self.register_parameter('coefficients',
torch.nn.Parameter(self._coeff))
@property
def coeff(self) -> torch.Tensor:
"""
A property that returns the weights of the layer
Returns:
torch.Tensor: Current coefficients (weights) layer
"""
return self.coefficients
@coeff.setter
def coeff(self, new_coeff: torch.Tensor) -> None:
"""
Setter for setting new weights
Args:
new_coeff (torch.Tensor): New coefficients for the layer
Raises:
AssertionError: If the shape of the new coefficients does not match the shape of the current coefficients
"""
assert new_coeff.shape == self.coeff.shape, \
'Coeff shape should be {}, but is actually {}' \
.format(self.coeff.shape, new_coeff.shape)
self._coeff = new_coeff
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Calculates output values based on input data and weights
The method adds a unit offset to the input data,
and then performs a matrix multiplication of the weights by the input data
to obtain the predicted output values
Args:
x (torch.Tensor): Input data having dimension (batch_size, d_in)
Returns:
torch.Tensor: Output values having dimension (batch_size, d_out)
"""
x_plus = torch.cat([x, torch.ones(x.shape[0], 1, device=x.device)], dim=1)
y_pred = torch.matmul(self.coeff, x_plus.t())
return y_pred.transpose(0, 2)
class _NN(torch.nn.Module):
"""
A class of fuzzy neural network that combines fuzzy rules and linear models
This class implements a fuzzy neural network consisting of three main layers:
1. The fuzzification layer of input variables
2. A layer of antecedents for forming rules
3. A consequence layer for calculating output values based on rules
Attributes:
outvarnames (List[str]): Names of output variables
num_in (int): The number of input variables
num_rules (int): The total number of fuzzy rules
layer (torch.nn.ModuleDict): A dictionary of network layers, including layers of fuzzification,
antecedents, and consequences
Args:
invardefs (List[Tuple[str, List[funcs]]]): A list of tuples,
where each tuple consists of the name of the input variable and
a list of membership functions for that variable
outvarnames (List[str]): A list of names of output variables
Properties:
num_out() -> int:
Returns the number of output variables
coeff() -> torch.Tensor:
Returns coefficients of the impact layer
"""
def __init__(self, invardefs: List[Tuple[str, List[funcs]]],
outvarnames: List[str]):
super(_NN, self).__init__()
self.outvarnames = outvarnames
varnames = [v for v, _ in invardefs]
mfdefs = [_FuzzyVar(mfs) for _, mfs in invardefs]
self.num_in = len(invardefs)
self.num_rules = np.prod([len(mfs) for _, mfs in invardefs])
self.layer = torch.nn.ModuleDict(OrderedDict([
('fuzzify', _FuzzyLayer(mfdefs, varnames)),
('rules', _AntecedentLayer(mfdefs)),
('consequent', _ConsequentLayer(self.num_in, self.num_rules, self.num_out)),
]))
@property
def num_out(self) -> int:
"""
Returns the number of output variables
Returns:
int: Number of output variables
"""
return len(self.outvarnames)
@property
def coeff(self) -> torch.Tensor:
"""
Returns coefficients of the sequence layer
Returns:
torch.Tensor: Current coefficients of the sequence layer
"""
return self.layer['consequent'].coeff
@coeff.setter
def coeff(self, new_coeff: torch.Tensor) -> None:
"""
A setter for setting new coefficients
Args:
new_coeff (torch.Tensor): New coefficients for the sequence layer
"""
self.layer['consequent'].coeff = new_coeff
def fit_coeff(self, x: torch.Tensor, y_actual: torch.Tensor) -> None:
"""
A method for learning the weights (coefficients) of the consequence layer
Args:
x (torch.Tensor): Input data used for training
y_actual (torch.Tensor): The actual output data to compare the predictions with
"""
pass
def input_variables(self) -> torch.nn.ModuleDict.items:
"""
Returns fuzzy input variables and their membership functions
Returns:
torch.nn.ModuleDict.items: Dictionary elements of fuzzy variables and membership functions
"""
return self.layer['fuzzify'].varmfs.items()
def output_variables(self) -> List[str]:
"""
Returns the names of the output variables
Returns:
List[str]: Names of output variables
"""
return self.outvarnames
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Performs forward propagation and returns the predicted output values
The input data is transmitted through the fuzzification layer, then processed
in the antecedent layer to calculate the degrees of rule fulfillment, and finally
used in the consequences layer to obtain the final output values
Args:
x (torch.Tensor): Input values having dimension (batch_size, num_in)
Returns:
torch.Tensor: Predicted output values having dimension (batch_size, num_out)
"""
self.fuzzified = self.layer['fuzzify'](x)
self.raw_weights = self.layer['rules'](self.fuzzified)
self.weights = F.normalize(self.raw_weights, p=1, dim=1)
self.rule_tsk = self.layer['consequent'](x)
y_pred = torch.bmm(self.rule_tsk, self.weights.unsqueeze(2))
self.y_pred = y_pred.squeeze(2)
return self.y_pred
[docs]
class Model:
"""
A class for creating and training a fuzzy logic model
This class is designed to perform regression and classification tasks using
fuzzy logic. It accepts input data, defines model parameters, and
performs preprocessing of the data
Attributes:
X (np.ndarray): A matrix of features from a data sample
Y (np.ndarray): The vector of the target variable from the data sample
n_input_features (int): Number of input features
n_terms (List[int]): A list containing the number of terms for each input feature
n_out_vars (int): Number of output variables
lr (float): The learning step for optimization
task_type (str): Task type ("regression" или "classification")
batch_size (int): The size of the subsample for training
member_func_type (str): Type of membership function ('gauss' - Gaussian membership function
'bell' is a generalized bell function)
device (torch.device): The device on which the model will be executed (for example, "cpu" or "cuda")
epochs (int): The number of epochs for training the model
scores (list): A list for saving model metrics
verbose (bool): The flag for the "detailed" output of information about the learning process
model (torch.nn.Module): The training model is currently undefined
Args:
X (np.ndarray): Input data for the model
Y (np.ndarray): Target values for the model
n_terms (list[int]): The number of terms for each input variable
n_out_vars (int): The number of output variables
lr (float): The learning step
task_type (str): Task type ("regression" или "classification")
batch_size (int): The size of the subsample for training
member_func_type (str): Type of membership function ('gauss' - Gaussian membership function
'bell' - generalized bell function)
epochs (int): The number of epochs for training the model
verbose (bool): The output detail level (False by default)
device (str): Computing device ('cpu', 'cuda'), default "cpu"
"""
def __init__(self, X: np.ndarray, Y: np.ndarray,
n_terms: list[int], n_out_vars: int, lr: float,
task_type: str,
batch_size: int, member_func_type: str,
epochs: int,
verbose: bool = False,
device: str = "cpu"):
self.X = X
self.Y = Y
self.n_input_features = X.shape[1]
self.n_terms = n_terms
self.n_out_vars = n_out_vars
self.lr = lr
self.task_type = task_type
self.batch_size = batch_size
self.member_func_type = member_func_type
self.device = torch.device(device)
self.epochs = epochs
self.scores = []
self.verbose = verbose
self.model = None
print(f"Creating an instance of the class" \
f"with the following hyperparameters\nNumber of input features: {self.n_input_features}\n" \
f"Number of terms: {self.n_terms}\nNumber of output variables: {self.n_out_vars}\n" \
f"The learning step: {self.lr}\nSubsample size: {self.batch_size}\n" \
f"Type of membership function: {self.member_func_type}\n" \
f"The size of the subsample for training: {self.batch_size}\n")
def __str__(self):
return f"Creating an instance of the class" \
f"with the following hyperparameters\nNumber of input features: {self.n_input_features}\n" \
f"Number of terms: {self.n_terms}\nNumber of output variables: {self.n_out_vars}\n" \
f"The learning step: {self.lr}\nSubsample size: {self.batch_size}\n" \
f"Type of membership function: {self.member_func_type}\n" \
f"The size of the subsample for training: {self.batch_size}\n"
def __repr__(self):
return f"Creating an instance of the class" \
f"with the following hyperparameters\nNumber of input features: {self.n_input_features}\n" \
f"Number of terms: {self.n_terms}\nNumber of output variables: {self.n_out_vars}\n" \
f"The learning step: {self.lr}\nSubsample size: {self.batch_size}\n" \
f"Type of membership function: {self.member_func_type}\n" \
f"The size of the subsample for training: {self.batch_size}\n"
def __preprocess_data(self) -> DataLoader:
"""
Preprocessing the data and creating a DataLoader
Converts input data and target values into tensors,
encodes output variables for classification
and creates a DataLoader object to provide data in batches.
Returns:
DataLoader: A DataLoader object containing preprocessed data.
"""
x = torch.Tensor(self.X)
if self.device:
x = x.to(self.device)
y = torch.Tensor(self.Y)
if self.device:
y = y.to(self.device)
dataset = TensorDataset(x, y)
return DataLoader(dataset, batch_size=self.batch_size, shuffle=False)
def __gauss_func(self, x: torch.Tensor) -> Tuple[List]:
"""
Generates parameters for Gaussian membership functions based on the input data
Calculates minima, maxima, and ranges for each input variable
and creates cents and sigma for Gaussian membership functions
Args:
x (torch.Tensor): Input data for which membership functions will be created
Returns:
Tuple[List]: A list of parameters of input variables and their corresponding membership functions
"""
input_num = x.shape[1]
min_values, _ = torch.min(x, dim=0)
max_values, _ = torch.max(x, dim=0)
ranges = max_values - min_values
input_vars = []
for i in range(input_num):
sigma = ranges[i] / self.n_terms[i]
mu_list = torch.linspace(min_values[i], max_values[i], self.n_terms[i]).tolist()
name = 'x{}'.format(i)
input_vars.append((name, make_gauss_mfs(sigma, mu_list)))
out_vars = ['y{}'.format(i) for i in range(self.n_out_vars)]
return input_vars, out_vars
def __bell_func(self, x: torch.Tensor) -> Tuple[List]:
"""
Generates parameters for bell-shaped membership functions based on the input data
Calculates the minima and maxima for each input variable and creates parameters
for bell-shaped membership functions
Args:
x (torch.Tensor): Input data for which membership functions will be created
Returns:
Tuple[List]: A tuple containing a list of parameters of input variables and their
corresponding membership functions
"""
input_num = x.shape[1]
min_values, _ = torch.min(x, dim=0)
max_values, _ = torch.max(x, dim=0)
input_vars = []
for i in range(input_num):
a, b = min_values / self.n_terms[i], max_values / self.n_terms[i]
c_list = torch.linspace(min_values[i], max_values[i], self.n_terms[i]).tolist()
name = 'x{}'.format(i)
input_vars.append((name, make_bell_mfs(a, b, c_list)))
out_vars = ['y{}'.format(i) for i in range(self.n_out_vars)]
return input_vars, out_vars
def __compile(self, x: torch.Tensor) -> _NN:
"""
Compiles a fuzzy neural network model based on the selected type of membership function
Calls methods to generate membership functions and creates an instance of the model
`_NN`. Transfers the model to the specified device (CPU or GPU)
Args:
x (torch.Tensor): The input data on the basis of which the model will be compiled
Returns:
_NN: An instance of a fuzzy neural network
"""
input_vars, out_vars = self.__gauss_func(x) if self.member_func_type == funcs_type[
"gauss"] else self.__bell_func(x)
model = _NN(input_vars, out_vars)
if self.device:
model.to(self.device)
return model
@staticmethod
def __class_criterion(inp: torch.Tensor, target: torch.Tensor) -> float:
"""
Calculates the value of the loss function for the classification task
Uses cross-entropy to determine the difference between predicted
and actual class labels
Args:
inp (torch.Tensor): The predicted values of the model
target (torch.Tensor): The actual class labels
Returns:
float: The value of the loss function
"""
return torch.nn.CrossEntropyLoss()(inp, target.squeeze().long())
@staticmethod
def __reg_criterion(inp: torch.Tensor, target: torch.Tensor) -> float:
"""
Вычисляет значение функции потерь для задачи регрессии.
Использует среднеквадратичную ошибку (MSE) для определения разницы между
предсказанными и фактическими значениями.
Args:
inp (torch.Tensor): Предсказанные значения модели.
target (torch.Tensor): Фактические значения.
Returns:
float: Значение функции потерь.
"""
return torch.nn.MSELoss()(inp, target.squeeze())
@staticmethod
def __calc_reg_score(preds: torch.Tensor, y_actual: torch.Tensor) -> float:
"""
Вычисляет оценку модели для задачи регрессии.
Определяет среднеквадратичную ошибку между предсказанными и фактическими значениями.
Args:
preds (torch.Tensor): Предсказанные значения модели.
y_actual (torch.Tensor): Фактические значения.
Returns:
float: Среднеквадратичная ошибка.
"""
with torch.no_grad():
tot_loss = F.mse_loss(preds, y_actual)
return tot_loss
@staticmethod
def __calc_class_score(preds: torch.Tensor, y_actual: torch.Tensor, x: torch.Tensor) -> float:
"""
Calculates the accuracy of the model for the classification task
Determines the percentage of correct predictions among all input data
Args:
preds (torch.Tensor): The predicted values of the model
y_actual (torch.Tensor): The actual class labels
x (torch.Tensor): Input values
Returns:
float: The percentage of correct predictions
"""
with torch.no_grad():
corr = torch.sum(y_actual.squeeze().long() == torch.argmax(preds, dim=1))
total = len(x)
return corr * 100 / total
def __train_loop(self, data: DataLoader, model: _NN,
criterion: Callable, calc_score: Callable,
optimizer: torch.optim.Adam) -> None:
"""
The main training cycle of the model
Trains the model on the data, updates the weights, and tracks
the model's performance during training
Args:
data (DataLoader): Loader of training data
model (_NN): A fuzzy neural network model
criterion (Callable): The loss function used for training
calc_score (Callable): A function for evaluating the model
optimizer (torch.optim.Adam): An optimizer for updating model weights
"""
score_class = 0
score_reg = 100000000000
for t in range(self.epochs):
for x, y_actual in data:
y_pred = model(x)
loss = criterion(y_pred, y_actual)
optimizer.zero_grad()
loss.backward()
optimizer.step()
x, y_actual = data.dataset.tensors
y_pred = model(x)
score = calc_score(y_pred, y_actual) if self.task_type == "regression" \
else calc_score(y_pred, y_actual, x)
if self.task_type == "regression":
if score < score_reg:
self.model = model
else:
if score > score_class:
self.model = model
self.scores.append(score)
if self.epochs < 30 or t % 10 == 0:
if self.verbose:
print(f"epoch: {t}, score: {score}")
[docs]
def train(self) -> _NN:
"""
Starts the learning process of the model
Performs data preprocessing, model compilation, and
training cycle execution using the specified criteria and optimizer
Returns:
_NN: A trained model of a fuzzy neural network
"""
train_data = self.__preprocess_data()
x, y = train_data.dataset.tensors
model = self.__compile(x)
optimizer = torch.optim.Adam(model.parameters(), lr=self.lr)
criterion = self.__class_criterion if self.task_type == task_types["classification"] else \
self.__reg_criterion
calc_error = self.__calc_class_score if self.task_type == task_types["classification"] else \
self.__calc_reg_score
self.__train_loop(train_data, model, criterion, calc_error, optimizer)
return self.model
[docs]
def save_model(self, path: str) -> None:
"""
Saves the state of the trained model to a file
Saves the model parameters using the specified path
Args:
path (str): The path to the file where the model will be saved
Raises:
Exception: If the model has not been trained
"""
if self.model:
torch.save(self.model.state_dict(), path)
else:
raise Exception("The model is not trained")
[docs]
def process_csv_data(path: str,
target_col: str,
n_features: int,
use_label_encoder: bool,
drop_index: bool,
split_size: float = 0.2,
use_split: bool = False) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray],
Tuple[np.ndarray, np.ndarray]]:
"""
An additional function for data preprocessing with the possibility of dividing the sample into train, test
Args:
path (str): The path to the data
target_col (str): The name of the target column
n_features (int): The number of input attributes
use_label_encoder (bool): True - use encoding of input features (if they are specified as a string),
False - no
drop_index (bool): True - delete the column with indexes, False - no
split_size (float): The size of the test subsample depends on the size of the entire dataset
use_split (bool): Use division into train, test
Returns:
Union[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray],
Tuple[np.ndarray, np.ndarray]]: Preprocessed data
"""
df = pd.read_csv(path)
Y = df[target_col]
X = df.drop(target_col, axis=1)
if drop_index:
X = X.drop(X.columns[0], axis=1)
if use_split:
X_train, X_test, Y_train, Y_test = train_test_split(X, Y,
test_size=split_size)
new_Y_train = Y_train.values
new_Y_test = Y_test.values
new_X_train = X_train.values[:, :n_features]
new_X_test = X_test.values[:, :n_features]
if use_label_encoder:
le = LabelEncoder()
y_train = le.fit_transform(new_Y_train)
y_test = le.fit_transform(new_Y_test)
else:
y_train = new_Y_train
y_test = new_Y_test
return new_X_train, new_X_test, y_train, y_test
else:
new_Y = Y.values
new_X = X.values[:, :n_features]
if use_label_encoder:
le = LabelEncoder()
y = le.fit_transform(new_Y)
else:
y = new_Y
return new_X, y