import heapq
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import seaborn as sns
from matplotlib.patches import Circle, FancyArrowPatch
from grafs_e.donnees import *
[docs]
class Graphs_maker:
def __init__(self):
from grafs_e.N_class import DataLoader
self.data = DataLoader()
[docs]
def get_graph(self, region, year, cut_isolated=True):
from grafs_e.N_class import NitrogenFlowModel
nitrogen_model = NitrogenFlowModel(
data=self.data,
year=year,
region=region,
categories_mapping=categories_mapping,
labels=labels_init,
cultures=cultures,
legumineuses=legumineuses,
prairies=prairies,
betail=betail,
Pop=Pop,
ext=ext,
)
adjacency_matrix = nitrogen_model.get_transition_matrix()
# Convertir la matrice pondérée en un graphe orienté pondéré
G = nx.from_numpy_array(adjacency_matrix, create_using=nx.DiGraph)
# Ajouter un attribut 'index' pour conserver l'indexation d'origine
for node in G.nodes:
G.nodes[node]["index"] = node # Stocker l'indice d'origine du nœud
# Ajouter les poids aux arêtes
for i, j in zip(*np.nonzero(adjacency_matrix)):
G[i][j]["weight"] = adjacency_matrix[i, j]
if cut_isolated:
# Supprimer les nœuds sans arêtes entrantes et sortantes
isolated_nodes = [node for node in G.nodes if G.in_degree(node) == 0 and G.out_degree(node) == 0]
G.remove_nodes_from(isolated_nodes)
return G
[docs]
def get_matrix(self, year, region):
from grafs_e.N_class import NitrogenFlowModel
nitrogen_model = NitrogenFlowModel(
data=self.data,
year=year,
region=region,
categories_mapping=categories_mapping,
labels=labels_init,
cultures=cultures,
legumineuses=legumineuses,
prairies=prairies,
betail=betail,
Pop=Pop,
ext=ext,
)
return nitrogen_model.get_transition_matrix()
[docs]
class GraphVisualizer:
def __init__(self, G):
self.G = G
self.pos = None # Positions des nœuds
self.node_colors = None
self.label_to_index = label_to_index # Mapping des labels vers les indices de nœuds
self.index_to_label = {v: k for k, v in label_to_index.items()}
[docs]
def draw_curved_edges(self, pos, ax, alpha=0.7, edge_color="white"):
for edge in self.G.edges():
src, dst = edge
rad = 0.2 # Ajustez ce paramètre pour contrôler la courbure
arrowprops = dict(arrowstyle="-", color=edge_color, lw=0.5, alpha=alpha, connectionstyle=f"arc3,rad={rad}")
ax.annotate("", xy=pos[dst], xycoords="data", xytext=pos[src], textcoords="data", arrowprops=arrowprops)
[docs]
def draw_curved_edges_log(self, pos, ax, alpha=0.1, edge_color="white"):
min_weight = 1e-2 # Plage min pour l'échelle logarithmique
max_weight = 1e2 # Plage max pour l'échelle logarithmique
for edge in self.G.edges(data=True):
src, dst, data = edge
weight = data["weight"]
# Appliquer l'échelle logarithmique
log_weight = np.log10(max(weight, min_weight)) # Éviter les poids nuls
normalized_weight = (log_weight - np.log10(min_weight)) / (np.log10(max_weight) - np.log10(min_weight))
# Définir l'épaisseur des arêtes sur une plage adaptée (par exemple 0.5 à 5)
line_width = 0.5 + normalized_weight * 4.5 # Épaisseur min 0.5, max 5
rad = 0.2 # Courbure des arêtes
arrowprops = dict(
arrowstyle="-|>", # Représenter la direction avec une flèche
color=edge_color,
lw=line_width, # Utiliser l'épaisseur ajustée
alpha=alpha,
connectionstyle=f"arc3,rad={rad}",
)
ax.annotate("", xy=pos[dst], xycoords="data", xytext=pos[src], textcoords="data", arrowprops=arrowprops)
[docs]
def draw_curved_edges_fancy(self, pos, ax, alpha=0.7):
min_weight = 1e-2
max_weight = 1e2
# Obtenir la liste des poids des arêtes
weights = [self.G[u][v]["weight"] for u, v in self.G.edges()]
# Normaliser les poids pour l'échelle logarithmique
norm = mcolors.LogNorm(vmin=min_weight, vmax=max_weight)
cmap = plt.cm.rainbow # Choix de la palette de couleurs adaptée à un fond noir
for edge in self.G.edges(data=True):
src, dst, data = edge
weight = data["weight"]
# Appliquer l'échelle logarithmique
log_weight = np.log10(max(weight, min_weight)) # Éviter les poids nuls
normalized_weight = (log_weight - np.log10(min_weight)) / (np.log10(max_weight) - np.log10(min_weight))
# Épaisseur de l'arête en fonction du poids
line_width = 0.5 + normalized_weight * 4.5 # Épaisseur min 0.5, max 5
# Couleur de l'arête en fonction du poids
edge_color = cmap(norm(weight))
rad = 0.2 # Courbure des arêtes
# Créer un patch courbé avec la couleur définie par le poids
arrow = FancyArrowPatch(
posA=pos[src],
posB=pos[dst],
connectionstyle=f"arc3,rad={rad}",
arrowstyle="-|>",
color=edge_color,
lw=line_width,
alpha=alpha,
mutation_scale=10,
)
ax.add_patch(arrow)
return cmap, norm
[docs]
def draw_nodes_with_patches(self, pos, ax, node_size=300, node_colors=None):
for i, (node, (x, y)) in enumerate(pos.items()):
# Dessiner les cercles avec `plt.Circle`
circle = Circle(
(x, y), radius=node_size / 2000, color=node_colors[i], zorder=2
) # zorder=2 pour être au-dessus des arêtes
ax.add_patch(circle)
[docs]
def regroup_nodes_layout(self, pos, node_colors, group_colors, ecart=0.05):
# Récupérer les positions initiales
new_pos = pos.copy()
group_centers = {} # Pour mémoriser les centres des groupes
# Grouper les nœuds par couleur
for color in group_colors:
# Récupérer les nœuds appartenant à ce groupe
group_nodes = [node for node, color_node in zip(self.G.nodes, node_colors) if color_node == color]
# Calculer le barycentre des positions actuelles pour ces nœuds
if group_nodes:
avg_x = np.mean([pos[node][0] for node in group_nodes])
avg_y = np.mean([pos[node][1] for node in group_nodes])
group_centers[color] = (avg_x, avg_y)
# Réassigner les positions proches du centre de groupe
for node in group_nodes:
new_pos[node] = (avg_x + np.random.uniform(-ecart, ecart), avg_y + np.random.uniform(-ecart, ecart))
# Placer les nœuds gris vers l'extérieur
for node, color in zip(self.G.nodes, node_colors):
if color == "gray":
# Calculer une direction vers l'extérieur (rayon plus large)
new_pos[node] = (new_pos[node][0] * 1, new_pos[node][1] * 1) # Ajustez si nécessaire
return new_pos
[docs]
def color_nodes(self, cultures, legumineuses, prairies, betail, Pop):
n, m, k, p, q = (
len(cultures),
len(legumineuses),
len(prairies),
len(betail),
len(Pop),
) # Nombre de nœuds à colorier
# Assigner une couleur en fonction de l'index d'origine
for node in self.G.nodes:
original_index = self.G.nodes[node]["index"] # Récupérer l'index d'origine
if original_index < n:
self.G.nodes[node]["color"] = "#FFA500" # Orange doux pour Cultures
elif original_index < n + m:
self.G.nodes[node]["color"] = "#66C2A5" # Vert clair pour Légumineuses
elif original_index < n + m + k:
self.G.nodes[node]["color"] = "#FC8D62" # Rouge doux pour Prairies
elif original_index < n + m + k + p:
self.G.nodes[node]["color"] = "#8DA0CB" # Bleu pastel pour Bétail
elif original_index < n + m + k + p + q:
self.G.nodes[node]["color"] = "#E5E5E5" # Blanc cassé pour Population
else:
self.G.nodes[node]["color"] = "#BFBFBF" # Gris pour les autres
# Stocker les couleurs des nœuds dans une liste
self.node_colors = [self.G.nodes[node]["color"] for node in self.G.nodes()]
[docs]
def plot_graph_base(self):
if self.node_colors is None:
# Couleurs par défaut si non assignées
self.node_colors = ["gray"] * self.G.number_of_nodes()
# Calculer les positions des nœuds avec `nx.spring_layout`
self.pos = nx.spring_layout(self.G, k=0.8)
# Appliquer `regroup_nodes_layout` pour regrouper les nœuds par couleur et éloigner les gris
group_colors = ["yellow", "green", "red", "blue", "white"] # Couleurs de groupe à regrouper
self.pos = self.regroup_nodes_layout(self.pos, self.node_colors, group_colors, ecart=0.1)
# Dessiner le graphe
fig, ax = plt.subplots(figsize=(16, 12), dpi=500, facecolor="black")
ax.set_facecolor("black")
ax.set_aspect("equal")
# Dessiner les arêtes courbées avec `FancyArrowPatch`
cmap, norm = self.draw_curved_edges_fancy(self.pos, ax, alpha=0.2)
# Dessiner les nœuds avec `plt.Circle` pour qu'ils apparaissent au-dessus des arêtes
self.draw_nodes_with_patches(self.pos, ax, node_size=20, node_colors=self.node_colors)
# Ajuster les limites des axes pour éviter que le graphe soit coupé
x_values, y_values = zip(*self.pos.values())
ax.set_xlim(min(x_values) - 0.1, max(x_values) + 0.1)
ax.set_ylim(min(y_values) - 0.1, max(y_values) + 0.1)
sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
sm.set_array([]) # Nécessaire pour que ScalarMappable fonctionne avec colorbar
cbar = plt.colorbar(sm, ax=ax, orientation="vertical")
cbar.set_label("Poids des arêtes")
# Modifier la couleur du label de la colorbar en blanc
cbar.set_label("Poids des arêtes", color="white")
# Modifier la couleur des graduations (ticks) de la colorbar en blanc
cbar.ax.yaxis.set_tick_params(color="white")
# Modifier la couleur des étiquettes de la colorbar en blanc
plt.setp(plt.getp(cbar.ax, "yticklabels"), color="white")
# Désactiver les axes
plt.axis("off")
plt.show()
[docs]
def ajouter_retour_ligne_si_long(self, text, longueur_max):
# Fonction pour ajouter un retour à la ligne si le texte est trop long
words = text.split()
lines = []
current_line = ""
for word in words:
if len(current_line + " " + word) <= longueur_max:
current_line += " " + word
else:
lines.append(current_line.strip())
current_line = word
lines.append(current_line.strip())
return "\n".join(lines)
[docs]
def draw_custom_networkx_edges(
self,
subgraph,
pos,
edgelist=None,
width=1,
edge_color="blue",
arrows=True,
arrowstyle="->",
arrowsize=15,
node_size=300,
):
"""
Dessine des arêtes avec des flèches ajustées pour qu'elles se terminent à la périphérie des nœuds.
:param subgraph: Le sous-graphe NetworkX
:param pos: Positions des nœuds calculées par un layout (ex: nx.spring_layout)
:param edgelist: Liste des arêtes à dessiner (par défaut, toutes les arêtes du sous-graphe)
:param width: Largeur des arêtes (peut être une liste correspondant à chaque arête)
:param edge_color: Couleur des arêtes
:param arrows: Afficher des flèches ou non
:param arrowstyle: Style des flèches ('->', '-|>', etc.)
:param arrowsize: Taille des flèches
:param node_size: Taille des nœuds pour ajuster les points de début et de fin des arêtes
"""
ax = plt.gca() # Récupérer l'axe actuel de la figure
if edgelist is None:
edgelist = list(subgraph.edges())
node_radius = np.sqrt(node_size) / 110 # Calculer le rayon approximatif des nœuds en fonction de leur taille
# Si width est une seule valeur, transformer en liste pour compatibilité
if not isinstance(width, list):
width = [width] * len(edgelist)
for i, (u, v) in enumerate(edgelist):
x1, y1 = pos[u]
x2, y2 = pos[v]
# Calculer l'angle de la direction de l'arête
angle = np.arctan2(y2 - y1, x2 - x1)
# Décaler les points de départ et d'arrivée de l'arête pour éviter les chevauchements avec les nœuds
x1_new = x1 + node_radius * np.cos(angle)
y1_new = y1 + node_radius * np.sin(angle)
x2_new = x2 - node_radius * np.cos(angle)
y2_new = y2 - node_radius * np.sin(angle)
# Dessiner une flèche avec la tête à l'extérieur du nœud
arrow = FancyArrowPatch(
(x1_new, y1_new),
(x2_new, y2_new),
connectionstyle="arc3,rad=0.0",
color=edge_color,
lw=width[i],
arrowstyle=arrowstyle,
mutation_scale=arrowsize,
)
ax.add_patch(arrow)
[docs]
def afficher_et_visualiser_10_noeuds_plus_proches(self, noeud_depart_label):
"""
Affiche les 10 nœuds les plus proches du nœud de départ, leur flux total,
et visualise le sous-graphe correspondant en excluant les nœuds intermédiaires spécifiés.
L'épaisseur des liens est proportionnelle au flux (avec échelle logarithmique).
:param noeud_depart_label: Label du nœud de départ
"""
# Vérifier que le mapping des labels est disponible
if self.label_to_index is None or self.index_to_label is None:
print("Le mapping des labels vers les indices n'est pas défini.")
return
# Vérifier que le nœud de départ existe dans le dictionnaire
if noeud_depart_label not in self.label_to_index:
print(f"Nœud de départ '{noeud_depart_label}' introuvable dans label_to_index.")
return
index_depart = self.label_to_index[noeud_depart_label]
ext_nodes = [self.label_to_index[label] for label in ext if label in self.label_to_index]
exclude_nodes_set = set(ext_nodes)
exclude_nodes_set.discard(index_depart) # Autoriser le nœud de départ même s'il est dans ext_nodes
# Calculer les distances et les chemins en utilisant l'algorithme de Dijkstra personnalisé
analyzer = GraphAnalyzer(self.G, self.label_to_index)
distances, paths = analyzer.dijkstra_exclusion(index_depart, weight="weight", exclude_nodes=exclude_nodes_set)
# Enlever le nœud de départ des résultats
distances.pop(index_depart, None)
paths.pop(index_depart, None)
if not distances:
print(f"Aucun nœud accessible depuis '{noeud_depart_label}' sans passer par les nœuds exclus.")
return
# Calculer le flux total pour chaque chemin (Option 1: utiliser 1/distance)
flux_totaux = {target: 1 / distance for target, distance in distances.items()}
# Trier les nœuds cibles en fonction du flux total (ordre décroissant)
flux_trie = sorted(flux_totaux.items(), key=lambda x: x[1], reverse=True)
top_10 = flux_trie[:10]
# Mapper les index vers les labels
index_to_label = self.index_to_label
# Afficher les résultats
print(
f"Les 10 nœuds les plus proches de '{noeud_depart_label}' (en excluant les nœuds intermédiaires spécifiés) :"
)
for i, (index_noeud, flux_total) in enumerate(top_10, 1):
label_noeud = index_to_label.get(index_noeud, f"Nœud {index_noeud}")
print(f"{i}. {label_noeud} - Flux total: {flux_total:.2f}")
# Construire le sous-graphe
nodes_in_subgraph = set()
edges_in_subgraph = []
for index_noeud in [index for index, flux in top_10]:
if index_noeud in paths:
path = paths[index_noeud]
nodes_in_subgraph.update(path)
edges_in_subgraph.extend([(path[i], path[i + 1]) for i in range(len(path) - 1)])
nodes_in_subgraph.add(index_depart)
# Créer le sous-graphe
subgraph = self.G.subgraph(nodes_in_subgraph).edge_subgraph(edges_in_subgraph).copy()
# Préparer les labels
labels = {index: index_to_label.get(index, str(index)) for index in subgraph.nodes()}
# Visualisation
plt.figure(figsize=(12, 8), dpi=500)
ax = plt.gca()
# Positionnement des nœuds
pos = nx.spring_layout(subgraph, pos=nx.planar_layout(subgraph))
# Couleurs des nœuds
node_colors = []
for node in subgraph.nodes():
if node == index_depart:
node_colors.append("red") # Nœud de départ en rouge
elif node in [index for index, flux in top_10]:
node_colors.append("lightblue") # Top 10 nœuds en bleu clair
else:
node_colors.append("gray") # Nœuds intermédiaires en gris
# Dessiner les nœuds avec labels
nx.draw_networkx_nodes(subgraph, pos, node_color=node_colors, node_size=6000)
# Ajouter les étiquettes avec un fond de la même couleur que les nœuds
for node, (x, y) in pos.items():
label = self.ajouter_retour_ligne_si_long(labels[node], 10)
if label == noeud_depart_label:
node_color = "red"
else:
node_color = "lightblue"
ax.annotate(
label,
(x, y),
xytext=(0, 0),
textcoords="offset points",
ha="center",
va="center",
fontsize=10,
fontweight="bold",
bbox=dict(boxstyle="square,pad=0.3", facecolor=node_color, edgecolor=node_color, alpha=0),
)
# Dessiner les arêtes en fonction du flux (poids) avec des épaisseurs proportionnelles (échelle logarithmique)
edges_with_flux = list(subgraph.edges())
flux = [self.G[u][v]["weight"] for u, v in edges_with_flux]
# Échelle logarithmique pour l'épaisseur des arêtes
flux_scaled = [np.log(f + 1) for f in flux] # Ajouter 1 pour éviter log(0)
max_flux_scaled = max(flux_scaled)
edge_widths = [2 + (f / max_flux_scaled) * 10 for f in flux_scaled] # Épaisseur entre 2 et 12
# Dessiner les arêtes avec la méthode de la classe
self.draw_custom_networkx_edges(
subgraph,
pos,
edgelist=edges_with_flux,
width=edge_widths,
edge_color="blue",
arrows=True,
arrowstyle="->",
arrowsize=50,
node_size=300,
)
# Annoter les arêtes avec les flux
edge_labels = {(u, v): f"{self.G[u][v]['weight']:.2f}" for u, v in subgraph.edges()}
nx.draw_networkx_edge_labels(
subgraph, pos, edge_labels=edge_labels, font_color="black", font_weight="bold", font_size=8
)
plt.axis("off")
plt.margins(x=0.1, y=0.1)
plt.show()
[docs]
def afficher_et_visualiser_10_noeuds_plus_proches_2(self, noeud_depart_label):
"""
Affiche les 10 nœuds les plus proches du nœud de départ, leur flux total,
et visualise le sous-graphe correspondant en excluant les nœuds intermédiaires spécifiés.
L'épaisseur des liens est proportionnelle au flux (avec échelle logarithmique).
:param noeud_depart_label: Label du nœud de départ
"""
# Vérifier que le mapping des labels est disponible
if self.label_to_index is None or self.index_to_label is None:
print("Le mapping des labels vers les indices n'est pas défini.")
return
# Vérifier que le nœud de départ existe dans le dictionnaire
if noeud_depart_label not in self.label_to_index:
print(f"Nœud de départ '{noeud_depart_label}' introuvable dans label_to_index.")
return
index_depart = self.label_to_index[noeud_depart_label]
ext_nodes = [self.label_to_index[label] for label in ext if label in self.label_to_index]
exclude_nodes_set = set(ext_nodes)
exclude_nodes_set.discard(index_depart) # Autoriser le nœud de départ même s'il est dans ext_nodes
# Calculer les distances et les chemins en utilisant l'algorithme de Dijkstra personnalisé
analyzer = GraphAnalyzer(self.G, self.label_to_index)
distances, paths = analyzer.dijkstra_exclusion(index_depart, weight="weight", exclude_nodes=exclude_nodes_set)
# Enlever le nœud de départ des résultats
distances.pop(index_depart, None)
paths.pop(index_depart, None)
if not distances:
print(f"Aucun nœud accessible depuis '{noeud_depart_label}' sans passer par les nœuds exclus.")
return
# Calculer le flux total pour chaque chemin (Option 1: utiliser 1/distance)
flux_totaux = {target: 1 / distance for target, distance in distances.items()}
# Trier les nœuds cibles en fonction du flux total (ordre décroissant)
flux_trie = sorted(flux_totaux.items(), key=lambda x: x[1], reverse=True)
top_10 = flux_trie[:10]
# Mapper les index vers les labels
index_to_label = self.index_to_label
# Afficher les résultats
print(
f"Les 10 nœuds les plus proches de '{noeud_depart_label}' (en excluant les nœuds intermédiaires spécifiés) :"
)
for i, (index_noeud, flux_total) in enumerate(top_10, 1):
label_noeud = index_to_label.get(index_noeud, f"Nœud {index_noeud}")
print(f"{i}. {label_noeud} - Flux total: {flux_total:.2f}")
# Construire le sous-graphe
nodes_in_subgraph = set()
edges_in_subgraph = []
for index_noeud in [index for index, flux in top_10]:
if index_noeud in paths:
path = paths[index_noeud]
nodes_in_subgraph.update(path)
edges_in_subgraph.extend([(path[i], path[i + 1]) for i in range(len(path) - 1)])
nodes_in_subgraph.add(index_depart)
# Créer le sous-graphe
subgraph = self.G.subgraph(nodes_in_subgraph).edge_subgraph(edges_in_subgraph).copy()
# Préparer les labels
labels = {index: index_to_label.get(index, str(index)) for index in subgraph.nodes()}
# Visualisation
plt.figure(figsize=(12, 8), dpi=500)
ax = plt.gca()
# Positionnement des nœuds
pos = nx.spring_layout(subgraph, iterations=50, k=0.07, pos=nx.planar_layout(subgraph))
# pos = nx.planar_layout(subgraph, scale=10)
# Couleurs des nœuds
node_colors = []
for node in subgraph.nodes():
if node == index_depart:
node_colors.append("red") # Nœud de départ en rouge
elif node in [index for index, flux in top_10]:
node_colors.append("lightblue") # Top 10 nœuds en bleu clair
else:
node_colors.append("gray") # Nœuds intermédiaires en gris
# Dessiner les nœuds sous forme de rectangles
for node, (x, y) in pos.items():
node_color = (
"red" if node == index_depart else "lightblue" if node in [index for index, flux in top_10] else "gray"
)
ax.scatter(x, y, s=500, c=node_color, edgecolors="black", marker="s") # 's' pour carré/rectangle
# Ajouter les étiquettes avec un fond de la même couleur que les nœuds (boîtes rectangulaires)
for node, (x, y) in pos.items():
label = self.ajouter_retour_ligne_si_long(labels[node], 10)
node_color = "red" if label == noeud_depart_label else "lightblue"
ax.annotate(
label,
(x, y),
xytext=(0, 0),
textcoords="offset points",
ha="center",
va="center",
fontsize=10,
fontweight="bold",
bbox=dict(boxstyle="square,pad=0.3", facecolor=node_color, edgecolor=node_color, alpha=0.5),
)
# Dessiner les arêtes en fonction du flux (poids) avec des épaisseurs proportionnelles (échelle logarithmique)
edges_with_flux = list(subgraph.edges())
flux = [self.G[u][v]["weight"] for u, v in edges_with_flux]
# Échelle logarithmique pour l'épaisseur des arêtes
flux_scaled = [np.log(f + 1) for f in flux] # Ajouter 1 pour éviter log(0)
max_flux_scaled = max(flux_scaled)
edge_widths = [2 + (f / max_flux_scaled) * 10 for f in flux_scaled] # Épaisseur entre 2 et 12
# Dessiner les arêtes avec la méthode de la classe
self.draw_custom_networkx_edges(
subgraph,
pos,
edgelist=edges_with_flux,
width=edge_widths,
edge_color="blue",
arrows=True,
arrowstyle="->",
arrowsize=50,
node_size=300,
)
# Annoter les arêtes avec les flux
edge_labels = {(u, v): f"{self.G[u][v]['weight']:.2f}" for u, v in subgraph.edges()}
nx.draw_networkx_edge_labels(
subgraph, pos, edge_labels=edge_labels, font_color="black", font_weight="bold", font_size=8
)
plt.axis("off")
plt.margins(x=0.1, y=0.1)
plt.show()
[docs]
def afficher_chemins_graphiquement_tot(self, n):
"""
Affiche graphiquement les chemins trouvés dans le graphe, en mettant en évidence les nœuds et les arêtes
impliqués dans les chemins sélectionnés.
:param n: Nombre total de plus courts chemins à afficher
"""
# Initialiser l'analyseur pour obtenir les chemins
analyzer = GraphAnalyzer(self.G, self.label_to_index)
chemins = analyzer.obtenir_n_plus_courts_chemins(n)
pos = nx.spring_layout(self.G, seed=42) # Disposition des nœuds avec un seed pour la reproductibilité
plt.figure(figsize=(12, 8))
# Dessiner tous les nœuds du graphe avec des couleurs standard
nx.draw_networkx_nodes(self.G, pos, node_color="lightgray", node_size=500)
# Dessiner toutes les arêtes du graphe avec des couleurs gris clair
nx.draw_networkx_edges(self.G, pos, edge_color="lightgray", style="solid")
# Dessiner les labels des nœuds
nx.draw_networkx_labels(self.G, pos, font_size=10, labels=self.index_to_label)
# Mettre en évidence les chemins sélectionnés
for source, target, cost, chemin in chemins:
# Colorer les nœuds impliqués dans les chemins sélectionnés
nx.draw_networkx_nodes(self.G, pos, nodelist=chemin, node_color="lightblue", node_size=600)
# Dessiner les arêtes correspondant aux chemins sélectionnés
edges_in_path = [(chemin[i], chemin[i + 1]) for i in range(len(chemin) - 1)]
nx.draw_networkx_edges(self.G, pos, edgelist=edges_in_path, edge_color="blue", width=2.5)
# Annoter les distances le long des arêtes du chemin
edge_labels = {
(chemin[i], chemin[i + 1]): f"{1 / self.G[chemin[i]][chemin[i + 1]]['weight']:.2f}"
for i in range(len(chemin) - 1)
}
nx.draw_networkx_edge_labels(self.G, pos, edge_labels=edge_labels, font_color="red")
plt.title("Visualisation des plus courts chemins trouvés", fontsize=14)
plt.axis("off") # Cacher les axes
plt.show()
[docs]
def afficher_chemins_graphiquement_sous_graph(self, n):
"""
Affiche graphiquement les chemins trouvés dans un graphe, en mettant en évidence uniquement les nœuds et les arêtes
impliqués dans les chemins sélectionnés, avec les noms des nœuds affichés plutôt que leurs numéros.
:param n: Nombre total de plus courts chemins à afficher
"""
# Obtenir les 'n' plus courts chemins en utilisant l'analyseur
analyzer = GraphAnalyzer(self.G, self.label_to_index)
chemins = analyzer.obtenir_n_plus_courts_chemins(n)
# Créer un sous-graphe contenant uniquement les nœuds et arêtes impliqués dans les 'n' plus courts chemins
sous_graphe = nx.DiGraph()
# Ajouter les nœuds et arêtes des chemins sélectionnés dans le sous-graphe
for source, target, cost, chemin in chemins:
sous_graphe.add_nodes_from(chemin)
edges_in_path = [(chemin[i], chemin[i + 1]) for i in range(len(chemin) - 1)]
sous_graphe.add_edges_from(edges_in_path)
# Créer la position des nœuds pour le sous-graphe
pos = nx.spring_layout(sous_graphe, seed=42) # Layout pour le sous-graphe
plt.figure(figsize=(12, 8))
# Convertir les nœuds du sous-graphe en leurs noms
labels = {node: self.index_to_label[node] for node in sous_graphe.nodes()}
# Dessiner les nœuds du sous-graphe
nx.draw_networkx_nodes(sous_graphe, pos, node_color="lightblue", node_size=600)
# Dessiner les arêtes du sous-graphe
nx.draw_networkx_edges(sous_graphe, pos, edge_color="blue", width=2.5)
# Ajouter les labels des nœuds (affichant les noms plutôt que les indices)
nx.draw_networkx_labels(sous_graphe, pos, labels, font_size=10)
# Annoter les distances le long des arêtes du sous-graphe
edge_labels = {(u, v): f"{self.G[u][v]['weight']:.2f}" for u, v in sous_graphe.edges()}
nx.draw_networkx_edge_labels(sous_graphe, pos, edge_labels=edge_labels, font_color="red")
plt.axis("off") # Cacher les axes
plt.show()
[docs]
def afficher_chemins_graphiquement(self, n):
"""
Affiche graphiquement les chemins trouvés dans le graphe, en ajustant les arêtes pour qu'elles se terminent
à la périphérie des nœuds et non au centre. Utilise des labels avec un fond de la même couleur que les nœuds.
:param n: Nombre total de plus courts chemins à afficher
"""
# Vérifier que le mapping des labels est disponible
if self.label_to_index is None or self.index_to_label is None:
print("Le mapping des labels vers les indices n'est pas défini.")
return
# Obtenir les 'n' plus courts chemins en utilisant l'analyseur
analyzer = GraphAnalyzer(self.G, self.label_to_index)
chemins = analyzer.obtenir_n_plus_courts_chemins(n)
index_to_label = self.index_to_label
# Créer un sous-graphe
sous_graphe = nx.DiGraph()
for source, target, cost, chemin in chemins:
sous_graphe.add_nodes_from(chemin)
edges_in_path = [(chemin[i], chemin[i + 1]) for i in range(len(chemin) - 1)]
sous_graphe.add_edges_from(edges_in_path)
# Position des nœuds
pos = nx.shell_layout(sous_graphe)
# Vous pouvez essayer d'autres layouts en décommentant l'une des lignes ci-dessous
# pos = nx.planar_layout(sous_graphe)
# pos = nx.random_layout(sous_graphe)
# pos = nx.spiral_layout(sous_graphe)
# pos = nx.spring_layout(sous_graphe, k=2)
plt.figure(figsize=(10, 10))
labels = {node: index_to_label[node] for node in sous_graphe.nodes()}
# Taille et couleur des nœuds
node_size = 8000
node_radius = np.sqrt(node_size) / 500 # Rayon des nœuds (approximé)
node_color = "orange"
# Dessiner les nœuds
nx.draw_networkx_nodes(sous_graphe, pos, node_color=node_color, node_size=node_size, edgecolors="black")
# Ajuster les arêtes pour qu'elles ne se terminent pas au centre des nœuds
ax = plt.gca()
for u, v in sous_graphe.edges():
x1, y1 = pos[u]
x2, y2 = pos[v]
# Calculer l'angle de la direction de l'arête
angle = np.arctan2(y2 - y1, x2 - x1)
# Décaler les points de départ et d'arrivée de l'arête pour éviter que la flèche ne se termine sous les nœuds
x1_new = x1 + node_radius * np.cos(angle)
y1_new = y1 + node_radius * np.sin(angle)
x2_new = x2 - node_radius * np.cos(angle)
y2_new = y2 - node_radius * np.sin(angle)
# Dessiner une arête courbée avec la flèche ajustée
if (v, u) in sous_graphe.edges():
arrow = FancyArrowPatch(
(x1_new, y1_new),
(x2_new, y2_new),
connectionstyle="arc3,rad=0.15",
color="darkblue",
lw=6,
arrowstyle="-|>",
mutation_scale=40,
)
else:
arrow = FancyArrowPatch(
(x1_new, y1_new),
(x2_new, y2_new),
connectionstyle="arc3,rad=0.05",
color="darkblue",
lw=6,
arrowstyle="-|>",
mutation_scale=40,
)
ax.add_patch(arrow)
# Ajouter les étiquettes avec un fond de la même couleur que les nœuds
for node, (x, y) in pos.items():
label = self.ajouter_retour_ligne_si_long(labels[node], 10)
ax.annotate(
label,
(x, y),
xytext=(0, 0),
textcoords="offset points",
ha="center",
va="center",
fontsize=10,
fontweight="bold",
bbox=dict(boxstyle="round,pad=0.3", facecolor=node_color, edgecolor=node_color, alpha=0),
)
# Annoter les flux sur les arêtes
edge_labels = {(u, v): f"{self.G[u][v]['weight']:.2f}" for u, v in sous_graphe.edges()}
nx.draw_networkx_edge_labels(
sous_graphe, pos, edge_labels=edge_labels, font_color="black", font_size=10, font_weight="bold"
)
plt.axis("off")
plt.margins(x=0.15, y=0.15)
plt.show()
def _hexagonal_layout_fixed(self, node_labels, scale=1.0):
"""
Crée un layout hexagonal compact pour les nœuds du graphe en suivant un ordre précis.
:param node_labels: Liste des labels des nœuds à placer dans cet ordre
:param scale: Échelle de la disposition
:return: Dictionnaire des positions des nœuds (hexagonal)
"""
pos = {}
width = np.ceil(np.sqrt(len(node_labels))).astype(int) # Calcul de la largeur nécessaire
x_spacing = scale
y_spacing = np.sqrt(3) / 2 * scale
for i, label in enumerate(node_labels):
if label in self.label_to_index:
node_index = self.label_to_index[label]
row = i // width
col = i % width
x = col * x_spacing
y = row * y_spacing
if row % 2 == 1:
x += x_spacing / 2 # Décalage pour créer une disposition hexagonale
pos[node_index] = (x, y)
else:
# Le nœud avec ce label n'est pas présent, on passe
print(f"Problème avec le label '{label}'")
continue
return pos
[docs]
def draw_hexagonal_graph(self, node_property, year=None, node_labels=None, scale=1.0, filename=None):
"""
Dessine un graphe avec un layout hexagonal et une échelle logarithmique dynamique,
en tenant compte d'un ordre fixe des nœuds spécifié par node_labels.
:param node_property: Propriété scalaire des nœuds (dict) à représenter avec une échelle logarithmique pour la taille des nœuds
:param year: L'année à afficher en haut à gauche de l'image
:param node_labels: Liste des labels des nœuds pour fixer leur ordre (par défaut, tous les labels)
:param scale: Échelle du layout hexagonal
:param filename: Nom du fichier si vous souhaitez sauvegarder l'image
"""
if node_labels is None:
node_labels = list(self.label_to_index.keys())
node_property_full = {label: node_property.get(self.label_to_index[label], 0) for label in node_labels}
# Définir les limites log pour la propriété des nœuds
log_min = 0 # Correspond à 10^0
log_max = 2.2 # Correspond à 10^2.2
# Limiter les valeurs des propriétés des nœuds dans la plage [10^log_min, 10^log_max]
node_property_full = {
node: min(max(value, 10**log_min), 10**log_max) for node, value in node_property_full.items()
}
# Normaliser les tailles des nœuds sur une échelle logarithmique
min_size = 10 # Taille minimale d'un nœud
max_size = 10000 # Taille maximale d'un nœud
log_node_property = np.log10([node_property_full[node] for node in node_labels])
# Convertir les propriétés log-scaled en tailles de nœuds, encadrées entre log_min et log_max
node_sizes = [
min_size + (max_size - min_size) * (value - log_min) / (log_max - log_min) for value in log_node_property
]
# Utiliser le layout hexagonal
pos = self._hexagonal_layout_fixed(node_labels, scale=scale)
# Créer une figure avec un fond noir
plt.figure(figsize=(14, 14), facecolor="black")
ax = plt.gca()
ax.set_facecolor("black")
# Tracer les nœuds avec les tailles ajustées
node_colors = [self.G.nodes[node]["color"] for node in self.G.nodes()]
nx.draw_networkx_nodes(self.G, pos, node_size=node_sizes, node_color=node_colors, alpha=1)
# Tracer les arêtes avec des couleurs plus neutres (blanches) et une largeur fine pour les liaisons
nx.draw_networkx_edges(self.G, pos, edge_color="white", alpha=0.1, width=0.5)
# Afficher les labels pour les nœuds les plus gros uniquement (par exemple, les nœuds au-dessus d'une taille seuil)
size_threshold = 1500 # Seuil pour afficher les labels (les plus gros nœuds uniquement)
labels = {
node: f"{self.ajouter_retour_ligne_si_long(self.index_to_label[node], 10)}"
for node, size in zip(self.G.nodes(), node_sizes)
if size > size_threshold
}
# Dessiner les labels
nx.draw_networkx_labels(self.G, pos, labels, font_size=10, font_color="white")
# Supprimer les axes pour une meilleure lisibilité
plt.axis("off")
# Ajouter l'année en haut de l'image si spécifiée
if year is not None:
plt.text(
0.02,
0.98,
f"Année = {year}",
transform=ax.transAxes,
fontsize=14,
fontweight="bold",
va="top",
ha="left",
color="white",
)
if filename is None:
# Afficher le graphe
plt.show()
else:
plt.savefig(filename, format="png", dpi=350)
plt.close() # Fermer la figure pour ne pas l'afficher
[docs]
class GraphAnalyzer:
def __init__(self, G):
self.G = G
self.label_to_index = label_to_index
[docs]
@staticmethod
def calculate_Neff(matrix):
"""
Calcule N = C.F à partir d'une matrice NumPy ou d'un graphe NetworkX.
"""
# Si c'est un graphe NetworkX, on extrait la matrice d'adjacence
if isinstance(matrix, nx.Graph):
matrix = nx.to_numpy_array(matrix)
# Total T : somme de tous les éléments de la matrice
T = np.sum(matrix)
# Somme des lignes T_{i.} et des colonnes T_{.j}
row_sums = np.sum(matrix, axis=1)
col_sums = np.sum(matrix, axis=0)
# Calcul du produit selon la formule
product = 1.0
for i in range(matrix.shape[0]):
for j in range(matrix.shape[1]):
if matrix[i, j] > 0: # Éviter de calculer si T_{ij} est nul
factor = (T**2) / (row_sums[i] * col_sums[j])
exponent = matrix[i, j] / (2 * T)
product *= factor**exponent
return product
[docs]
@staticmethod
def calculate_Feff(matrix):
"""
Calcule F à partir d'une matrice NumPy ou d'un graphe NetworkX.
"""
# Si c'est un graphe NetworkX, on extrait la matrice d'adjacence
if isinstance(matrix, nx.Graph):
matrix = nx.to_numpy_array(matrix)
# Total T : somme de tous les éléments de la matrice
T = np.sum(matrix)
# Calcul du produit selon la formule
product = 1.0
for i in range(matrix.shape[0]):
for j in range(matrix.shape[1]):
T_ij = matrix[i, j]
if T_ij > 0: # Éviter les divisions par zéro ou les logs de zéro
factor = T_ij / T
exponent = -T_ij / T
product *= factor**exponent
return product
[docs]
@staticmethod
def calculate_Ceff(matrix):
return GraphAnalyzer.calculate_Feff(matrix) / GraphAnalyzer.calculate_Neff(matrix)
[docs]
@staticmethod
def calculate_Reff(matrix):
return GraphAnalyzer.calculate_Neff(matrix) ** 2 / GraphAnalyzer.calculate_Feff(matrix)
[docs]
def plot_flux_distribution_semilog(self):
# Extraire les poids des arêtes
edge_weights = [data["weight"] for u, v, data in self.G.edges(data=True)]
# Créer l'histogramme sans transformation logarithmique
plt.figure(figsize=(8, 6))
plt.hist(edge_weights, bins=100, color="skyblue", edgecolor="black", log=True)
# Ajouter un titre et des étiquettes
plt.title("Distribution des poids des arêtes (Flux) - Échelle linéaire", fontsize=16)
plt.xlabel("Poids des arêtes (Flux)", fontsize=14)
plt.ylabel("Nombre de réalisations", fontsize=14)
# Ajouter une grille
plt.grid(True, alpha=0.5)
# Afficher le graphique
plt.show()
[docs]
def plot_flux_distribution_log(self):
# Extraire les poids des arêtes
edge_weights = [data["weight"] for u, v, data in self.G.edges(data=True)]
# Appliquer une transformation logarithmique aux poids des arêtes
log_weights = np.log10(edge_weights)
# Créer l'histogramme
plt.figure(figsize=(8, 6))
plt.hist(log_weights, bins=50, color="deepskyblue", edgecolor="k", log=True)
# plt.title('Distribution des poids des arêtes (Flux) - Échelle logarithmique', fontsize=16)
plt.xlabel("Flow size (log scale)", fontsize=14)
plt.ylabel("Number of occurrences (Log scale)", fontsize=14)
plt.grid(True, alpha=0.5)
plt.show()
[docs]
def plot_flux_distribution_log_x(self):
# Extraire les poids des arêtes
edge_weights = [data["weight"] for u, v, data in self.G.edges(data=True)]
# Créer l'histogramme
plt.figure(figsize=(8, 6))
plt.hist(edge_weights, bins=10000, color="skyblue", edgecolor="black")
# Appliquer une échelle logarithmique sur l'axe X uniquement
plt.xscale("log") # Échelle log pour l'axe X
# L'axe Y reste linéaire par défaut
# Ajouter un titre et des étiquettes
plt.title("Distribution des poids des arêtes (Flux) - Échelle logarithmique sur X", fontsize=16)
plt.xlabel("Poids des arêtes (Flux) (log)", fontsize=14)
plt.ylabel("Nombre de réalisations", fontsize=14)
# Ajouter une grille
plt.grid(True, alpha=0.5)
# Afficher le graphique
plt.show()
[docs]
def plot_degree_distribution_power_law(self):
# Extraire les degrés des nœuds
degrees = [degree for node, degree in self.G.degree()]
# Créer un histogramme des degrés
plt.figure(figsize=(8, 6))
plt.hist(degrees, bins=np.logspace(np.log10(1), np.log10(max(degrees)), 10), color="skyblue", edgecolor="black")
plt.xscale("log")
plt.yscale("log")
plt.title("Distribution des degrés (Loi de puissance)", fontsize=16)
plt.xlabel("Degré (Échelle log)", fontsize=14)
plt.ylabel("Fréquence (Échelle log)", fontsize=14)
plt.grid(True, which="both", ls="--", alpha=0.7)
plt.show()
[docs]
def plot_weight_probability_distribution(self, N_bins=150):
# Extraire les poids des arêtes
edge_weights = [data["weight"] for u, v, data in self.G.edges(data=True)]
# Créer un histogramme des poids avec normalisation
plt.figure(figsize=(8, 6))
counts, bins, patches = plt.hist(
edge_weights,
bins=np.logspace(np.log10(min(edge_weights)), np.log10(max(edge_weights)), N_bins),
density=True,
cumulative=False,
color="deepskyblue",
edgecolor="black",
)
plt.xscale("log") # Échelle logarithmique sur l'axe des poids
plt.yscale("log") # Optionnel
plt.xlabel("Edge Weight (Log Scale)", fontsize=14)
plt.ylabel("Probability Density (Log Scale)", fontsize=14)
plt.grid(True, which="both", ls="--", alpha=0.3)
plt.show()
[docs]
def obtenir_n_plus_courts_chemins(self, n):
"""
Obtenir les 'n' plus courts chemins dans tout le graphe en appliquant l'algorithme dijkstra_exclusion,
c'est-à-dire en excluant certains nœuds comme nœuds intermédiaires.
:param G: Graphe NetworkX avec un attribut 'weight' pour les arêtes
:param n: Nombre total de chemins à retourner
:return: Liste des 'n' plus courts chemins [(source, cible, coût, chemin)]
"""
# Liste pour stocker tous les chemins trouvés
tous_les_chemins = []
all_nodes = list(self.G.nodes()) # Liste de tous les nœuds du graphe
ext_nodes = [self.label_to_index[label] for label in ext]
exclude_nodes_set = set(ext_nodes) # Ensemble des nœuds à exclure comme nœuds intermédiaires
# Parcourir chaque nœud comme nœud de départ
for source in all_nodes:
# Calculer les plus courts chemins depuis 'source' en appliquant dijkstra_exclusion
distances, paths = self.dijkstra_exclusion(source, weight="weight", exclude_nodes=exclude_nodes_set)
# Ajouter tous les chemins trouvés dans la liste
for target, cost in distances.items():
if source != target:
chemin = paths[target] # Chemin de 'source' vers 'target'
tous_les_chemins.append((source, target, cost, chemin))
# Trier tous les chemins trouvés par le coût (ordre croissant)
tous_les_chemins_triees = sorted(tous_les_chemins, key=lambda x: x[2])
# Retourner les 'n' plus courts chemins (ou moins si moins de chemins sont trouvés)
return tous_les_chemins_triees[:n]
[docs]
def calculer_longueur_moyenne_chemins(self):
longueurs_chemins = [] # Liste pour stocker les longueurs des plus courts chemins
# Calculer les plus courts chemins depuis chaque nœud source
for source, longueurs_cibles in nx.all_pairs_dijkstra_path_length(self.G, weight="weight"):
for cible, longueur in longueurs_cibles.items():
if source != cible:
longueurs_chemins.append(longueur)
# Vérifier s'il y a des chemins pour éviter la division par zéro
if len(longueurs_chemins) == 0:
return float("inf") # Ou retourner une valeur appropriée si aucun chemin n'existe
longueur_moyenne = sum(longueurs_chemins) / len(longueurs_chemins)
return longueur_moyenne
[docs]
def calculer_et_tracer_distribution_distances(
self,
nb_bins=50,
titre="Distribution des distances entre les nœuds",
sauvegarder=False,
nom_fichier="histogramme_distances.png",
):
"""
Calcule les plus courts chemins entre toutes les paires de nœuds dans un graphe pondéré dirigé,
en utilisant la distance définie comme la somme des inverses des poids,
et trace un histogramme de la distribution des distances avec une échelle logarithmique sur l'axe des abscisses.
:param nb_bins: Nombre de barres dans l'histogramme
:param titre: Titre du graphique
:param sauvegarder: Booléen pour indiquer si le graphique doit être sauvegardé
:param nom_fichier: Nom du fichier pour la sauvegarde du graphique
"""
# Vérifier que le graphe a des poids sur les arêtes
if not nx.get_edge_attributes(self.G, "weight"):
raise ValueError("Le graphe n'a pas d'attribut 'weight' sur les arêtes.")
# Définir la fonction de poids inversé
def inverse_weight(u, v, d):
poids = d.get("weight", 1)
if poids > 0:
return 1 / poids
else:
return float("inf") # Gérer les poids nuls ou négatifs
# Calculer les plus courts chemins en utilisant la fonction de poids inversé
# Cela renvoie un dictionnaire de dictionnaires : lengths[source][target] = distance
longueurs_dict = dict(nx.all_pairs_dijkstra_path_length(self.G, weight=inverse_weight))
# Collecter les distances dans une liste
longueurs_chemins = []
for source in longueurs_dict:
for cible, longueur in longueurs_dict[source].items():
if source != cible:
longueurs_chemins.append(longueur)
# Filtrer les distances positives pour l'échelle logarithmique
longueurs_positives = [l for l in longueurs_chemins if l > 0]
# Tracer l'histogramme
plt.figure(figsize=(10, 6))
sns.histplot(longueurs_positives, bins=nb_bins, log_scale=(True, False))
plt.xlabel("Longueur du plus court chemin (échelle logarithmique)")
plt.ylabel("Nombre de paires de nœuds")
plt.title(titre)
if sauvegarder:
plt.savefig(nom_fichier)
plt.show()
# Calculer la longueur moyenne des chemins
if longueurs_chemins:
longueur_moyenne = sum(longueurs_chemins) / len(longueurs_chemins)
else:
longueur_moyenne = np.nan
return longueur_moyenne
[docs]
def calculer_et_tracer_distribution_distances_V2(
self,
nb_bins=50,
titre="Distribution des distances entre les nœuds",
sauvegarder=False,
nom_fichier="histogramme_distances.png",
):
"""
Calcule les plus courts chemins en excluant les chemins passant par des nœuds intermédiaires spécifiques,
puis trace l'histogramme des longueurs des plus courts chemins.
:param nb_bins: Nombre de barres dans l'histogramme
:param titre: Titre du graphique
:param sauvegarder: Booléen indiquant si le graphique doit être sauvegardé
:param nom_fichier: Nom du fichier pour la sauvegarde du graphique
:return: Longueur moyenne des chemins
"""
longueurs_chemins = []
all_nodes = list(self.G.nodes())
# Exclure les nœuds en tant que nœuds intermédiaires, mais les autoriser en tant que source ou cible
ext_nodes = [self.label_to_index[label] for label in ext]
exclude_nodes_set = set(ext_nodes)
for source in all_nodes:
# Exclure le nœud source des nœuds à exclure
exclude_nodes = exclude_nodes_set - {source}
distances, paths = self.dijkstra_exclusion(source, weight="weight", exclude_nodes=exclude_nodes)
for target, length in distances.items():
if source != target:
longueurs_chemins.append(length)
# Filtrer les longueurs positives pour l'échelle logarithmique
longueurs_positives = [l for l in longueurs_chemins if l > 0]
# Tracer l'histogramme
plt.figure(figsize=(10, 6))
sns.histplot(longueurs_positives, bins=nb_bins, log_scale=(True, False))
plt.xlabel("Longueur du plus court chemin (échelle logarithmique)")
plt.ylabel("Nombre de paires de nœuds")
plt.title(titre)
if sauvegarder:
plt.savefig(nom_fichier)
plt.show()
# Calculer et retourner la longueur moyenne des chemins
if len(longueurs_positives) == 0:
return float("inf")
longueur_moyenne = sum(longueurs_positives) / len(longueurs_positives)
return longueur_moyenne
[docs]
def dijkstra_exclusion(self, source, weight="weight", exclude_nodes=set()):
"""
Algorithme de Dijkstra personnalisé pour calculer les plus courts chemins dans un graphe,
en excluant les nœuds intermédiaires appartenant à 'exclude_nodes', tout en permettant que
les nœuds exclus soient utilisés comme nœud de départ ou nœud final.
:param source: Le nœud de départ à partir duquel les chemins doivent être calculés
:param weight: Nom de l'attribut des arêtes représentant les poids (par défaut 'weight')
:param exclude_nodes: Ensemble des nœuds à exclure en tant que nœuds intermédiaires
:return: distances, paths
distances: dictionnaire {nœud: coût du plus court chemin depuis le nœud de départ}
paths: dictionnaire {nœud: liste représentant le chemin le plus court depuis le nœud de départ}
"""
distances = {}
paths = {}
queue = [(0, source, [source])]
while queue:
cost, u, path = heapq.heappop(queue)
# Si nous avons déjà trouvé un chemin vers 'u', nous passons
if u in distances:
continue
distances[u] = cost
paths[u] = path
for v, data in self.G[u].items():
# Éviter les cycles
if v in path:
continue
edge_weight = data.get(weight, 1)
if edge_weight == 0:
continue # Évite la division par zéro
new_cost = cost + 1 / edge_weight
new_path = path + [v]
# Vérifier si le nouveau chemin contient des nœuds exclus en tant que nœuds intermédiaires
# Exclure le premier et le dernier nœud du chemin
intermediates = new_path[1:-1]
has_excluded_intermediate = any(node in exclude_nodes for node in intermediates)
if has_excluded_intermediate:
continue # Ne pas ajouter 'v' si le chemin passe par un nœud exclu intermédiaire
# Enregistrer le chemin si c'est un meilleur coût ou s'il n'a pas encore été visité
if v not in distances or new_cost < distances[v]:
heapq.heappush(queue, (new_cost, v, new_path))
return distances, paths
[docs]
def in_centrality(self):
"""
Calcule la centralité d'entrée pondérée (somme des poids entrants) pour chaque nœud du graphe.
:return: Dictionnaire {nœud: centralité d'entrée pondérée}
"""
in_centrality_dict = {index_to_label[node]: 0 for node in self.G.nodes()}
for node in self.G.nodes():
# Pour chaque nœud, on somme les poids des arêtes entrantes
for predecessor, _, data in self.G.in_edges(node, data=True):
weight = data.get("weight", 0) # On récupère le poids de l'arête (défaut : 0)
in_centrality_dict[index_to_label[node]] += weight
return in_centrality_dict
[docs]
def out_centrality(self):
"""
Calcule la centralité de sortie pondérée (somme des poids sortants) pour chaque nœud du graphe.
:return: Dictionnaire {nœud: centralité de sortie pondérée}
"""
out_centrality_dict = {index_to_label[node]: 0 for node in self.G.nodes()}
for node in self.G.nodes():
# Pour chaque nœud, on somme les poids des arêtes sortantes
for _, successor, data in self.G.out_edges(node, data=True):
weight = data.get("weight", 0) # On récupère le poids de l'arête (défaut : 0)
out_centrality_dict[index_to_label[node]] += weight
return out_centrality_dict
[docs]
def weighted_assortativity(self):
# Calculer les sommes des poids des arêtes pour les nœuds
degree_dict = dict(self.G.degree(weight="weight"))
# Extraire les arêtes et leurs poids
edges = list(self.G.edges(data=True))
weight_sum = 0
degree_product_sum = 0
degree_diff_sum = 0
for u, v, data in edges:
weight = data["weight"]
deg_u = degree_dict[u]
deg_v = degree_dict[v]
weight_sum += weight
degree_product_sum += deg_u * deg_v
degree_diff_sum += (deg_u - deg_v) ** 2
# Calcul de l'assortativité pondérée (une approximation)
return weight_sum / degree_product_sum if degree_product_sum != 0 else 0
# Implémentez les autres méthodes d'analyse selon vos besoins