NNGT/testing/library_compatibility.py

795 lines
23 KiB
Python

# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2015-2023 Tanguy Fardet
# SPDX-License-Identifier: GPL-3.0-or-later
# testing/library_compatibility.py
# This file is part of the NNGT module
# Distributed as a free software, in the hope that it will be useful, under the
# terms of the GNU General Public License.
"""
Test that use some existing graph libraries for graph analysis, or that compare
the results with an existing implementation.
"""
import numpy as np
import nngt
import nngt.analysis as na
backends = ["networkx", "igraph", "graph-tool"]
all_backends = ["networkx", "igraph", "graph-tool", "nngt"]
def test_weighted_undirected_clustering():
'''
Compare the onnela implementation with networkx and Barrat with igraph.
'''
# import networkx and igraph
import networkx as nx
import igraph as ig
# create a pre-defined graph
num_nodes = 5
edge_list = [
(0, 3), (1, 0), (1, 2), (2, 4), (4, 1), (4, 3), (4, 0)
]
weights = [0.53, 0.45, 0.8, 0.125, 0.66, 0.31, 0.78]
# create nx graph and compute reference onnela clustering
nx_graph = nx.Graph()
nx_graph.add_nodes_from(range(num_nodes))
arr_edges = [e + (w,) for e, w in zip(edge_list, weights)]
nx_graph.add_weighted_edges_from(arr_edges, weight="weight")
onnela = list(nx.clustering(nx_graph, weight="weight").values())
triplets = [3, 3, 1, 1, 6]
gc_onnela = np.sum(np.multiply(onnela, triplets)) / np.sum(triplets)
# create ig graph and compute reference Barrat clustering
ig_graph = ig.Graph(num_nodes, directed=False)
ig_graph.add_edges(edge_list)
ig_graph.es["weight"] = weights
barrat = ig_graph.transitivity_local_undirected(mode="zero",
weights="weight")
strength = np.array(ig_graph.strength(weights='weight'))
degree = np.array(ig_graph.degree())
triplets = strength*(degree - 1)
gc_barrat = np.sum(np.multiply(barrat, triplets)) / np.sum(triplets)
# check for all backends
for bckd in all_backends:
nngt.set_config("backend", bckd)
g = nngt.Graph(nodes=num_nodes, directed=False)
g.new_edges(edge_list, attributes={"weight": weights})
# onnela
assert np.all(np.isclose(
na.local_clustering(g, weights="weight", method="onnela"),
onnela))
assert np.isclose(
na.global_clustering(g, weights="weight", method="onnela"),
gc_onnela)
# barrat
assert np.all(np.isclose(
na.local_clustering(g, weights="weight", method="barrat"),
barrat))
assert np.isclose(
na.global_clustering(g, weights="weight", method="barrat"),
gc_barrat)
# fully reciprocal directed version
g = nngt.Graph(nodes=num_nodes, directed=True)
g.new_edges(edge_list, attributes={"weight": weights})
g.new_edges(np.array(edge_list, dtype=int)[:, ::-1],
attributes={"weight": weights})
assert np.all(np.isclose(
na.local_clustering(g, weights="weight", method="onnela"),
onnela))
assert np.isclose(
na.global_clustering(g, weights="weight", method="onnela"),
gc_onnela)
assert np.all(np.isclose(
na.local_clustering(g, weights="weight", method="barrat"),
barrat))
assert np.isclose(
na.global_clustering(g, weights="weight", method="barrat"),
gc_barrat)
def test_assortativity():
''' Check assortativity result for all backends '''
# DIRECTED
num_nodes = 5
edge_list = [(0, 3), (1, 0), (1, 2), (2, 4), (4, 1), (4, 3), (4, 2)]
weights = [0.53, 0.45, 0.8, 0.125, 0.66, 0.31, 0.78]
# expected results
assort_unweighted = -0.47140452079103046
assort_weighted = -0.5457956719785911
for bckd in backends:
nngt.set_config("backend", bckd)
g = nngt.Graph(nodes=num_nodes)
g.new_edges(edge_list, attributes={"weight": weights})
assert np.isclose(
nngt.analyze_graph["assortativity"](g, "in"), assort_unweighted)
# not check weighted version for networkx for now
assert np.isclose(
nngt.analyze_graph["assortativity"](g, "in", weights=True),
assort_weighted)
# UNDIRECTED
edge_list = [(0, 3), (1, 0), (1, 2), (2, 4), (4, 1), (4, 3)]
weights = weights[:len(edge_list)]
# expected results
assort_unweighted = -0.33333333333333215
assort_weighted = -0.27351320394915296
for bckd in backends:
nngt.set_config("backend", bckd)
g = nngt.Graph(nodes=num_nodes, directed=False)
g.new_edges(edge_list, attributes={"weight": weights})
assert np.isclose(
nngt.analyze_graph["assortativity"](g, "in"), assort_unweighted)
# not check weighted version for networkx for now
if bckd != "networkx":
assert np.isclose(
nngt.analyze_graph["assortativity"](g, "in", weights=True),
assort_weighted)
def test_closeness():
''' Check closeness results for all backends '''
num_nodes = 5
edge_list = [(0, 1), (0, 3), (1, 3), (2, 0), (3, 2), (3, 4), (4, 2)]
weights = [0.54881, 0.71518, 0.60276, 0.54488, 0.42365, 0.64589, 0.43758]
expected = [2/3, 0.5, 0.5, 0.5714285714285714, 0.4444444444444444]
weighted = [1.06273031, 0.89905622, 0.83253895, 1.12504606, 0.86040934]
for bckd in backends:
nngt.set_config("backend", bckd)
g = nngt.Graph(nodes=num_nodes, directed=True)
g.new_edges(edge_list, attributes={"weight": weights})
assert np.all(np.isclose(
nngt.analyze_graph["closeness"](g, harmonic=False), expected))
assert np.all(np.isclose(
nngt.analyze_graph["closeness"](g, weights=True, harmonic=False),
weighted))
# with zero degrees and harmonic implementation
# closeness does not work for igraph if some nodes have zero in/out-degrees
edge_list = [(0, 1), (0, 2), (0, 3), (1, 3), (3, 2), (3, 4), (4, 2)]
# DIRECTED
harmonic = [7/8, 0.5, 0, 0.5, 1/4]
arithmetic = [0.8, 0.6, np.NaN, 1., 1.]
harmonic_in = [0, 1/4, 7/8, 0.5, 0.5]
arithmetic_in = [np.NaN, 1, 0.8, 1., 0.6]
harmonic_wght = [1.42006842, 0.92688794, 0., 0.97717257, 0.5713241 ]
arithmetic_wght = [1.28394428, 1.10939361, np.NaN, 1.86996279, 2.2852964]
for bckd in backends:
nngt.set_config("backend", bckd)
g = nngt.Graph(nodes=num_nodes, directed=True)
g.new_edges(edge_list, attributes={"weight": weights})
assert np.all(np.isclose(
nngt.analyze_graph["closeness"](g, harmonic=True), harmonic))
assert np.all(np.isclose(
nngt.analyze_graph["closeness"](g, harmonic=False), arithmetic,
equal_nan=True))
assert np.all(np.isclose(
nngt.analyze_graph["closeness"](g, mode="in", harmonic=True),
harmonic_in))
assert np.all(np.isclose(
nngt.analyze_graph["closeness"](g, mode="in", harmonic=False),
arithmetic_in, equal_nan=True))
assert np.all(np.isclose(
nngt.analyze_graph["closeness"](g, weights=True, harmonic=True),
harmonic_wght))
assert np.all(np.isclose(
nngt.analyze_graph["closeness"](g, weights=True, harmonic=False),
arithmetic_wght, equal_nan=True))
# UNDIRECTED
harmonic = [7/8, 3/4, 7/8, 1., 3/4]
arithmetic = [0.8, 2/3, 0.8, 1., 2/3]
harmonic_wght = [1.436723, 1.382419, 1.76911934, 1.85074797, 1.38520591]
arithmetic_wght = [1.3247182, 1.2296379, 1.5717462, 1.8040934, 1.16720163]
for bckd in backends:
nngt.set_config("backend", bckd)
g = nngt.Graph(nodes=num_nodes, directed=False)
g.new_edges(edge_list, attributes={"weight": weights})
assert np.all(np.isclose(
nngt.analyze_graph["closeness"](g, harmonic=True), harmonic))
assert np.all(np.isclose(
nngt.analyze_graph["closeness"](g, weights=True, harmonic=True),
harmonic_wght))
assert np.all(np.isclose(
nngt.analyze_graph["closeness"](g, harmonic=False), arithmetic,
equal_nan=True))
assert np.all(np.isclose(
nngt.analyze_graph["closeness"](g, weights="weight",
harmonic=False),
arithmetic_wght, equal_nan=True))
def test_betweenness():
''' Check betweenness results for all backends '''
num_nodes = 5
# UNDIRECTED
edge_list = [(0, 1), (0, 2), (0, 3), (1, 3), (2, 3), (2, 4), (3, 4)]
weights = [0.1, 0.3, 1.5, 0.8, 5.6, 4., 2.3]
nb_expect = [0.083333, 0.0, 0.083333, 0.3333333, 0.0]
eb_expect = [0.15, 0.2, 0.15, 0.25, 0.15, 0.15, 0.25]
nb_exp_wght = [0.5, 2/3, 0, 0.5, 0]
eb_exp_wght = [0.6, 0.4, 0, 0.6, 0, 0, 0.4]
for bckd in backends:
nngt.set_config("backend", bckd)
g = nngt.Graph(nodes=num_nodes, directed=False)
g.new_edges(edge_list, attributes={"weight": weights})
nb, eb = nngt.analyze_graph["betweenness"](g)
assert np.all(np.isclose(nb, nb_expect))
assert np.all(np.isclose(eb, eb_expect))
# weighted
nb, eb = nngt.analyze_graph["betweenness"](g, weights=True)
assert np.all(np.isclose(nb, nb_exp_wght))
assert np.all(np.isclose(eb, eb_exp_wght))
# DIRECTED
edge_list = [
(0, 1), (0, 2), (0, 3), (1, 3), (3, 2), (3, 4), (4, 2), (4, 0)
]
weights = [0.1, 0.3, 1.5, 0.8, 5.6, 4., 2.3, 0.9]
nb_expect = [0.25, 0, 0, 1/3, 0.25]
eb_expect = [0.15, 0.05, 0.15, 0.2, 0.1, 0.3, 0.05, 0.3]
nb_exp_wght = [0.5, 0.25, 0, 1/3, 5/12]
eb_exp_wght = [0.3, 0.2, 0, 0.35, 0, 0.4, 0, 0.45]
for bckd in backends:
nngt.set_config("backend", bckd)
g = nngt.Graph(nodes=num_nodes, directed=True)
g.new_edges(edge_list, attributes={"weight": weights})
nb, eb = nngt.analyze_graph["betweenness"](g)
assert np.all(np.isclose(nb, nb_expect))
assert np.all(np.isclose(eb, eb_expect))
# weighted
nb, eb = nngt.analyze_graph["betweenness"](g, weights=True)
assert np.all(np.isclose(nb, nb_exp_wght))
assert np.all(np.isclose(eb, eb_exp_wght))
def test_components():
''' Check connected components for all backends '''
num_nodes = 8
edge_list = [(0, 1), (0, 2), (1, 2)]
for i in range(3, num_nodes - 1):
edge_list.append((i, i+1))
edge_list.append((7, 3))
for bckd in all_backends:
nngt.set_config("backend", bckd)
g = nngt.Graph(nodes=num_nodes, directed=True)
g.new_edges(edge_list)
# scc
cc, hist = \
nngt.analyze_graph["connected_components"](g, ctype="scc")
idx = np.array([i for i in range(num_nodes)], dtype=int)
# nodes are assigned to the correct components
assert np.all(cc[0] not in cc[1:]) # 3 first are isolated
assert np.all(cc[1] not in cc[idx != 1]) # 3 first are isolated
assert np.all(cc[2] not in cc[idx != 2]) # 3 first are isolated
assert np.all(cc[3:] == cc[3]) # 5 last together
# hence counts should be 1, 1, 1, and 5
assert hist[cc[0]] == 1
assert hist[cc[1]] == 1
assert hist[cc[2]] == 1
assert hist[cc[5]] == 5
# wcc
cc, hist = \
nngt.analyze_graph["connected_components"](g, ctype="wcc")
# nodes are assigned to the correct components
assert np.all(cc[:3] == cc[0]) # 3 first together
assert np.all(cc[3:] == cc[3]) # 5 last together
# hence counts should be 3 and 5
assert hist[cc[0]] == 3
assert hist[cc[5]] == 5
# undirected
g = nngt.Graph(nodes=num_nodes, directed=False)
g.new_edges(edge_list)
cc, hist = \
nngt.analyze_graph["connected_components"](g)
# nodes are assigned to the correct components
assert np.all(cc[:3] == cc[0]) # 3 first together
assert np.all(cc[3:] == cc[3]) # 5 last together
# hence counts should be 3 and 5
assert hist[cc[0]] == 3
assert hist[cc[5]] == 5
def test_diameter():
''' Check connected components for all backends '''
# unconnected
num_nodes = 8
edge_list = [(0, 1), (0, 2), (1, 2)]
for i in range(3, num_nodes - 1):
edge_list.append((i, i+1))
edge_list.append((7, 3))
weights = [0.58, 0.59, 0.88, 0.8, 0.61, 0.66, 0.62, 0.28]
for bckd in backends:
nngt.set_config("backend", bckd)
g = nngt.Graph(nodes=num_nodes, directed=True)
g.new_edges(edge_list, attributes={"weight": weights})
assert np.isinf(nngt.analyze_graph["diameter"](g, weights=None))
assert np.isinf(nngt.analyze_graph["diameter"](g, weights=True))
# connected
num_nodes = 5
edge_list = [(0, 1), (0, 3), (1, 3), (2, 0), (3, 2), (3, 4), (4, 2)]
weights = [0.58, 0.59, 0.88, 0.8, 0.61, 0.66, 0.28]
for bckd in backends:
nngt.set_config("backend", bckd)
g = nngt.Graph(nodes=num_nodes, directed=True)
g.new_edges(edge_list, attributes={"weight": weights})
d = nngt.analyze_graph["diameter"](g)
assert nngt.analyze_graph["diameter"](g, weights=None) == 3
assert np.isclose(
nngt.analyze_graph["diameter"](g, weights="weight"), 2.29)
def test_binary_shortest_distance():
''' Check shortest distance '''
num_nodes = 5
for bckd in backends:
nngt.use_backend(bckd)
# UNDIRECTED
edge_list = [
(0, 3), (1, 0), (1, 2), (2, 4), (4, 1), (4, 3), (4, 0)
]
g = nngt.Graph(num_nodes, directed=False)
g.new_edges(edge_list)
mat_dist = na.shortest_distance(g)
undirected_dist = np.array([
[0, 1, 2, 1, 1],
[1, 0, 1, 2, 1],
[2, 1, 0, 2, 1],
[1, 2, 2, 0, 1],
[1, 1, 1, 1, 0],
])
assert np.array_equal(mat_dist, undirected_dist)
# undirected, sources
mat_dist = na.shortest_distance(g, sources=[0, 1, 2])
assert np.array_equal(mat_dist, undirected_dist[:3])
# undirected targets
mat_dist = na.shortest_distance(g, targets=[0, 1, 2])
assert np.array_equal(mat_dist[:, :3], undirected_dist[:, :3])
# single source/target
assert na.shortest_distance(g, sources=0, targets=2) == 2
# DIRECTED
g = nngt.Graph(num_nodes, directed=True)
edge_list = [
(0, 3), (1, 0), (1, 2), (2, 4), (4, 1), (4, 3), (4, 2), (4, 0)
]
g.new_edges(edge_list)
directed_dist = np.array([
[0., np.inf, np.inf, 1., np.inf],
[1., 0., 1., 2., 2. ],
[2., 2., 0., 2., 1. ],
[np.inf, np.inf, np.inf, 0., np.inf],
[ 1., 1., 1., 1., 0. ]
])
mat_dist = na.shortest_distance(g)
assert np.array_equal(mat_dist, directed_dist)
# check undirected from directed
mat_dist = na.shortest_distance(g, directed=False)
assert np.array_equal(mat_dist, undirected_dist)
# single source
mat_dist = na.shortest_distance(g, sources=[0])
assert np.array_equal(mat_dist, directed_dist[:1].ravel())
# single target
mat_dist = na.shortest_distance(g, targets=0)
assert np.array_equal(mat_dist, directed_dist[:, 0].ravel())
# single source/target directed
assert np.isinf(na.shortest_distance(g, sources=0, targets=2))
def test_weighted_shortest_distance():
''' Check shortest distance '''
for bckd in backends:
nngt.use_backend(bckd)
num_nodes = 5
edge_list = [
(0, 3), (1, 0), (1, 2), (2, 4), (4, 1), (4, 3), (4, 0)
]
weights = [2., 1., 1., 3., 2., 3., 2.]
# UNDIRECTED
g = nngt.Graph(num_nodes, directed=False)
g.new_edges(edge_list)
mat_dist = na.shortest_distance(g, weights=weights)
undirected_dist = np.array([
[0, 1, 2, 2, 2],
[1, 0, 1, 3, 2],
[2, 1, 0, 4, 3],
[2, 3, 4, 0, 3],
[2, 2, 3, 3, 0],
])
assert np.array_equal(mat_dist, undirected_dist)
# undirected, sources
g.set_weights(weights)
mat_dist = na.shortest_distance(g, sources=[0, 1, 2], weights='weight')
assert np.array_equal(mat_dist, undirected_dist[:3])
# undirected targets
mat_dist = na.shortest_distance(g, targets=[0, 1, 2], weights='weight')
assert np.array_equal(mat_dist[:, :3], undirected_dist[:, :3])
# single source/target
dist = na.shortest_distance(g, sources=3, targets=2, weights='weight')
assert dist == 4
# DIRECTED
g = nngt.Graph(num_nodes, directed=True)
g.new_edges(edge_list)
directed_dist = np.array([
[0., np.inf, np.inf, 2., np.inf],
[1., 0., 1., 3., 4. ],
[5., 5., 0., 6., 3. ],
[np.inf, np.inf, np.inf, 0., np.inf],
[ 2., 2., 3., 3., 0. ]
])
mat_dist = na.shortest_distance(g, weights=weights)
assert np.array_equal(mat_dist, directed_dist)
# check undirected from directed gives back undirected results
mat_dist = na.shortest_distance(g, directed=False, weights=weights)
assert np.array_equal(mat_dist, undirected_dist)
# single source
g.set_weights(weights)
mat_dist = na.shortest_distance(g, sources=[0], weights='weight')
assert np.array_equal(mat_dist, directed_dist[:1].ravel())
# single target
mat_dist = na.shortest_distance(g, targets=0, weights='weight')
assert np.array_equal(mat_dist, directed_dist[:, 0].ravel())
# single source/target directed
assert np.isinf(
na.shortest_distance(g, sources=0, targets=2, weights='weight'))
def test_binary_shortest_paths():
''' Test shortests paths with BFS algorithm '''
for bckd in backends:
nngt.use_backend(bckd)
num_nodes = 5
# UNDIRECTED
edge_list = [
(0, 1), (0, 3), (1, 2), (1, 4), (2, 3), (3, 4)
]
g = nngt.Graph(num_nodes, directed=False)
g.new_edges(edge_list)
assert na.shortest_path(g, 0, 0) == [0]
assert na.shortest_path(g, 0, 1) == [0, 1]
assert na.shortest_path(g, 0, 2) in ([0, 1, 2], [0, 3, 2])
count = 0
for p in na.all_shortest_paths(g, 4, 2):
assert p in ([4, 1, 2], [4, 3, 2])
count += 1
assert count == 2
count = 0
for p in na.all_shortest_paths(g, 1, 1):
assert p == [1]
count += 1
assert count == 1
# DIRECTED
edge_list = [
(0, 1), (0, 3), (1, 2), (1, 4), (3, 2), (4, 3)
]
g = nngt.Graph(num_nodes, directed=True)
g.new_edges(edge_list)
assert na.shortest_path(g, 0, 0) == [0]
assert na.shortest_path(g, 2, 4) == []
assert na.shortest_path(g, 0, 2) in ([0, 1, 2], [0, 3, 2])
count = 0
for p in na.all_shortest_paths(g, 4, 2):
assert p == [4, 3, 2]
count += 1
assert count == 1
count = 0
for p in na.all_shortest_paths(g, 1, 1):
assert p == [1]
count += 1
assert count == 1
def test_weighted_shortest_paths():
''' Test shortest paths with Dijsktra algorithm '''
for bckd in backends:
nngt.use_backend(bckd)
num_nodes = 5
# UNDIRECTED
edge_list = [
(0, 1), (0, 3), (1, 2), (1, 4), (2, 3), (3, 4)
]
weights = [5, 0.1, 3., 0.5, 1, 3.5]
g = nngt.Graph(num_nodes, directed=False)
g.new_edges(edge_list)
g.set_weights(weights)
assert na.shortest_path(g, 0, 0, weights='weight') == [0]
assert na.shortest_path(g, 0, 1, weights='weight') == [0, 3, 2, 1]
sp = na.shortest_path(g, 1, 3, weights=weights)
assert sp in ([1, 4, 3], [1, 2, 3])
count = 0
for p in na.all_shortest_paths(g, 1, 3, weights='weight'):
assert p in ([1, 4, 3], [1, 2, 3])
count += 1
assert count == 2
count = 0
for p in na.all_shortest_paths(g, 1, 1, weights='weight'):
assert p == [1]
count += 1
assert count == 1
# DIRECTED
edge_list = [
(0, 1), (0, 3), (1, 2), (1, 4), (3, 2), (4, 3)
]
weights = [1., 0.1, 0.1, 0.5, 1, 3.5]
g = nngt.Graph(num_nodes, directed=True)
g.new_edges(edge_list, attributes={"weight": weights})
assert na.shortest_path(g, 0, 0, weights='weight') == [0]
assert na.shortest_path(g, 2, 4, weights='weight') == []
assert na.shortest_path(g, 1, 3, weights=weights) == [1, 4, 3]
count = 0
for p in na.all_shortest_paths(g, 0, 2, weights='weight'):
assert p in ([0, 1, 2], [0, 3, 2])
count += 1
assert count == 2
count = 0
for p in na.all_shortest_paths(g, 1, 1, weights='weight'):
assert p == [1]
count += 1
assert count == 1
# UNDIRECTED FROM DIRECTED
weights = [5, 0.1, 3., 0.5, 1, 3.5]
# reset weights
g.set_weights(weights)
assert na.shortest_path(g, 0, 0, False, weights='weight') == [0]
assert na.shortest_path(
g, 0, 1, False, weights='weight') == [0, 3, 2, 1]
sp = na.shortest_path(g, 1, 3, False, weights=weights)
assert sp in ([1, 4, 3], [1, 2, 3])
count = 0
for p in na.all_shortest_paths(g, 1, 3, False, weights='weight'):
assert p in ([1, 4, 3], [1, 2, 3])
count += 1
assert count == 2
count = 0
for p in na.all_shortest_paths(g, 1, 1, False, weights='weight'):
assert p == [1]
count += 1
assert count == 1
def test_subgraph_centrality():
''' Check subgraph centrality with networkx implementation '''
num_nodes = 5
edge_list = [(0, 1), (0, 2), (0, 3), (1, 3), (2, 3), (2, 4), (3, 4)]
# this test requires networkx as backend
nngt.use_backend("networkx")
g = nngt.Graph(num_nodes, directed=False)
g.new_edges(edge_list)
# test full centralities
import networkx as nx
sc_nx = nx.subgraph_centrality(g.graph)
sc_nx = np.array([sc_nx[i] for i in range(num_nodes)])
sc_nngt = nngt.analysis.subgraph_centrality(g, weights=False,
normalize=False)
assert np.all(np.isclose(sc_nx, sc_nngt))
# test max_centrality
sc_nngt = nngt.analysis.subgraph_centrality(g, weights=False,
normalize="max_centrality")
assert np.all(np.isclose(sc_nx / sc_nx.max(), sc_nngt))
# test subpart
sc_nngt = nngt.analysis.subgraph_centrality(g, weights=False,
normalize=False, nodes=[0, 1])
assert np.all(np.isclose(sc_nx[:2], sc_nngt))
if __name__ == "__main__":
test_weighted_undirected_clustering()
test_assortativity()
test_closeness()
test_betweenness()
test_components()
test_diameter()
test_binary_shortest_distance()
test_weighted_shortest_distance()
test_binary_shortest_paths()
test_weighted_shortest_paths()
test_subgraph_centrality()