NNGT/testing/test_generation2.py

583 lines
16 KiB
Python

# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2015-2023 Tanguy Fardet
# SPDX-License-Identifier: GPL-3.0-or-later
# testing/test_generation2.py
# This file is part of the NNGT module
# Distributed as a free software, in the hope that it will be useful, under the
# terms of the GNU General Public License.
"""
Test the new methods of the :mod:`~nngt.generation` module.
"""
import os
import numpy as np
import pytest
import nngt
import nngt.analysis as na
import nngt.generation as ng
if os.environ.get("MPI"):
nngt.set_config("mpi", True)
@pytest.mark.mpi_skip
def test_from_degree_list():
'''
Check that the degrees generated using `from_degree_list` indeed
correspond to the provided list
'''
num_nodes = 1000
deg_list = np.random.randint(0, 100, size=num_nodes)
# test for in
g = ng.from_degree_list(deg_list, degree_type="in", nodes=num_nodes)
assert np.array_equal(g.get_degrees("in"), deg_list)
assert g.edge_nb() == np.sum(deg_list)
# test for out
g = ng.from_degree_list(deg_list, degree_type="out", nodes=num_nodes)
assert np.array_equal(g.get_degrees("out"), deg_list)
assert g.edge_nb() == np.sum(deg_list)
# test for total-degree
deg_list = 2*np.random.randint(0, 50, size=num_nodes)
try:
g = ng.from_degree_list(deg_list, degree_type="total", nodes=num_nodes,
directed=True)
assert np.array_equal(g.get_degrees("total"), deg_list)
assert g.edge_nb() == int(0.5*np.sum(deg_list))
except ValueError:
# non-graphical sequence was provided
print("Skipping non graphical sequence for 'total-degree'.")
# test for undirected
deg_list = 2*np.random.randint(0, 50, size=num_nodes)
try:
g = ng.from_degree_list(deg_list, nodes=num_nodes, directed=False)
assert np.all(g.get_degrees("total") == deg_list)
assert g.edge_nb() == int(0.5*np.sum(deg_list))
except ValueError:
# non-graphical sequence was provided
print("Skipping non graphical sequence for undirected graph.")
@pytest.mark.mpi_skip
def test_newman_watts():
'''
Check the newman_watts generation method.
'''
num_nodes = 5
k_lattice = 2
p_shortcut = 0.2
## USING EDGES
# undirected
g = ng.newman_watts(k_lattice, edges=6, nodes=num_nodes, directed=False)
lattice_edges = [(0, 1), (0, 4), (1, 2), (2, 3), (3, 4)]
for e in lattice_edges:
assert g.has_edge(e)
assert g.edge_nb() == 6 # min_edges + one shortcut
# directed
reciprocity = 0.
g = ng.newman_watts(k_lattice, reciprocity_circular=reciprocity, edges=6,
nodes=num_nodes, directed=True)
assert g.edge_nb() == 6 # 5 lattice edge + one shortcut
assert 0. <= na.reciprocity(g) <= 1/3.
reciprocity = 1.
g = ng.newman_watts(k_lattice, reciprocity_circular=reciprocity, edges=12,
nodes=num_nodes, directed=True)
assert g.edge_nb() == 12 # 10 lattice edges + 2 shortcuts
assert 5/6. <= na.reciprocity(g) <= 1
reciprocity = 0.5
g = ng.newman_watts(k_lattice, reciprocity_circular=reciprocity, edges=8,
nodes=num_nodes, directed=True)
assert g.edge_nb() == 8 # 7 lattice edges + 1 shortcuts
assert 0.5 <= na.reciprocity(g) <= 0.75
## USING PROBABILITY
# undirected
g = ng.newman_watts(k_lattice, p_shortcut, nodes=num_nodes, directed=False)
assert 0.5*k_lattice*num_nodes <= g.edge_nb() <= k_lattice*num_nodes
# directed
reciprocity = 0.
g = ng.newman_watts(k_lattice, p_shortcut, reciprocity, nodes=num_nodes,
directed=True)
assert 0.5*k_lattice*num_nodes <= g.edge_nb() <= k_lattice*num_nodes
reciprocity = 1.
g = ng.newman_watts(k_lattice, p_shortcut, reciprocity, nodes=num_nodes,
directed=True)
assert k_lattice*num_nodes <= g.edge_nb() <= 2*k_lattice*num_nodes
reciprocity = 0.5
g = ng.newman_watts(k_lattice, p_shortcut, reciprocity, nodes=num_nodes,
directed=True)
recip_fact = 0.5*(1 + reciprocity / (2 - reciprocity))
min_edges = int(np.round(recip_fact*k_lattice*num_nodes))
assert min_edges <= g.edge_nb() <= 2*recip_fact*k_lattice*num_nodes
@pytest.mark.mpi_skip
def test_watts_strogatz():
'''
Check the watts_strogatz generation method.
'''
num_nodes = 5
k_lattice = 2
p_shortcut = 0.2
# undirected
g = ng.watts_strogatz(k_lattice, p_shortcut, nodes=num_nodes,
directed=False)
assert g.edge_nb() == int(0.5*k_lattice*num_nodes)
# directed
reciprocity = 0.
g = ng.watts_strogatz(k_lattice, p_shortcut, reciprocity, nodes=num_nodes,
directed=True)
assert g.edge_nb() == 0.5*k_lattice*num_nodes
reciprocity = 1.
g = ng.watts_strogatz(k_lattice, p_shortcut, reciprocity, nodes=num_nodes,
shuffle="sources", directed=True)
assert g.edge_nb() == k_lattice*num_nodes
assert np.all(g.get_degrees("in") == k_lattice)
g = ng.watts_strogatz(k_lattice, p_shortcut, reciprocity, nodes=num_nodes,
shuffle="targets", directed=True)
assert np.all(g.get_degrees("out") == k_lattice)
reciprocity = 0.5
g = ng.watts_strogatz(k_lattice, p_shortcut, reciprocity, nodes=num_nodes,
directed=True)
recip_fact = 0.5*(1 + reciprocity / (2 - reciprocity))
assert g.edge_nb() == int(np.round(recip_fact*k_lattice*num_nodes))
# limit cases
for p_shortcut in (0, 1):
g = ng.watts_strogatz(k_lattice, p_shortcut, nodes=num_nodes,
directed=False)
assert g.edge_nb() == 0.5*k_lattice*num_nodes
g = ng.watts_strogatz(k_lattice, p_shortcut, nodes=num_nodes,
directed=True)
assert g.edge_nb() == k_lattice*num_nodes
@pytest.mark.mpi
def test_mpi_from_degree_list():
'''
Check that the degrees generated using `from_degree_list` indeed
correspond to the provided list
'''
num_nodes = 1000
from mpi4py import MPI
comm = MPI.COMM_WORLD
deg_list = np.random.randint(0, int(0.1*num_nodes), size=num_nodes)
# test for in
g = ng.from_degree_list(deg_list, degree_type="in", nodes=num_nodes)
deg = g.get_degrees("in")
deg = comm.gather(deg, root=0)
if nngt.on_master_process():
deg = np.sum(deg, axis=0)
assert np.all(deg == deg_list)
num_edges = g.edge_nb()
num_edges = comm.gather(num_edges, root=0)
if nngt.on_master_process():
num_edges = np.sum(num_edges)
assert num_edges == np.sum(deg_list)
# test for out
g = ng.from_degree_list(deg_list, degree_type="out", nodes=num_nodes)
deg = g.get_degrees("out")
deg = comm.gather(deg, root=0)
if nngt.on_master_process():
deg = np.sum(deg, axis=0)
assert np.all(deg == deg_list)
num_edges = g.edge_nb()
num_edges = comm.gather(num_edges, root=0)
if nngt.on_master_process():
num_edges = np.sum(num_edges)
assert num_edges == np.sum(deg_list)
@pytest.mark.mpi_skip
def test_total_undirected_connectivities():
''' Test total-degree connectivities '''
num_nodes = 1000
# erdos-renyi
density = 0.1
lower, upper = 0.3, 5.4
weights = {"distribution": "uniform", "lower": lower, "upper": upper}
g = ng.erdos_renyi(density=density, nodes=num_nodes, directed=False,
weights=weights)
assert g.edge_nb() / (num_nodes*num_nodes) == density
# check weights
ww = g.get_weights()
assert np.all((lower <= ww) * (ww <= upper))
# check other graph types
for directed in (True, False):
# fixed-degree
deg = 50
g = ng.fixed_degree(deg, "total", nodes=num_nodes, directed=directed)
assert {deg} == set(g.get_degrees())
# gaussian degree
avg = 50.
std = 5.
g = ng.gaussian_degree(avg, std, degree_type="total", nodes=num_nodes,
directed=directed)
deviation = 20. / np.sqrt(num_nodes)
average = np.average(g.get_degrees())
assert avg - deviation <= average <= avg + deviation
@pytest.mark.mpi_skip
def test_all_to_all():
''' Test all-to-all connection scheme '''
num_nodes = 4
# via direct generation call
g = ng.all_to_all(nodes=num_nodes, directed=False)
assert np.array_equal(
g.edges_array, [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)])
g = ng.all_to_all(nodes=num_nodes, directed=True)
assert np.array_equal(
g.edges_array, [(0, 1), (0, 2), (0, 3), (1, 0), (1, 2), (1, 3),
(2, 0), (2, 1), (2, 3), (3, 0), (3, 1), (3, 2)])
# via connector call
g = nngt.Graph(num_nodes)
ng.connect_nodes(g, [0, 1], [2, 3], "all_to_all")
assert np.array_equal(g.edges_array, [(0, 2), (0, 3), (1, 2), (1, 3)])
g = nngt.Graph(num_nodes)
ng.connect_nodes(g, [0, 1], [1, 2, 3], "all_to_all")
assert np.array_equal(g.edges_array,
[(0, 1), (0, 2), (0, 3), (1, 2), (1, 3)])
@pytest.mark.mpi_skip
def test_distances():
''' Check that distances are properly generated for SpatialGraphs '''
# simple graph
num_nodes = 4
pos = [(0, 0), (1, 0), (2, 0), (3, 0)]
g = nngt.SpatialGraph(num_nodes, positions=pos)
edges = [(0, 1), (0, 3), (1, 2), (2, 3)]
g.new_edges(edges)
dist = g.edge_attributes["distance"]
expected = np.abs(np.diff(g.edges_array, axis=1)).ravel()
assert np.array_equal(dist, expected)
g.new_node(positions=[(4, 0)])
g.new_edge(1, 4)
assert g.get_edge_attributes((1, 4), "distance") == 3
# distance rule
g = ng.distance_rule(2.5, rule="lin", nodes=num_nodes, avg_deg=2,
positions=pos)
dist = g.edge_attributes["distance"]
expected = np.abs(np.diff(g.edges_array, axis=1)).ravel()
assert np.array_equal(dist, expected)
assert np.all(dist < 3)
# using the connector functions
num_nodes = 20
pop = nngt.NeuralPop.exc_and_inhib(num_nodes)
pos = np.array([(i, 0) for i in range(num_nodes)])
net = nngt.SpatialNetwork(pop, positions=pos)
inh = pop["inhibitory"]
exc = pop["excitatory"]
ng.connect_groups(net, exc, pop, "erdos_renyi", avg_deg=5)
ng.connect_groups(net, inh, pop, "random_scale_free", in_exp=2.1,
out_exp=2.1, avg_deg=5)
dist = net.edge_attributes["distance"]
expected = np.abs(np.diff(net.edges_array, axis=1)).ravel()
assert np.array_equal(dist, expected)
@pytest.mark.mpi_skip
def test_price():
''' Test Price network '''
# directed
m = 5
g = ng.price_scale_free(m, nodes=100)
in_degrees = g.get_degrees("in")
out_degrees = g.get_degrees("out")
assert set(out_degrees) == {i for i in range(m + 1)}
assert in_degrees.min() == 0
# undirected
g = ng.price_scale_free(m, nodes=100, directed=False)
degrees = g.get_degrees()
assert np.all(degrees >= m)
# reciprocity
g = ng.price_scale_free(m, nodes=100, reciprocity=1)
assert np.all(degrees >= m)
assert na.reciprocity(g) == 1
r = 0.3
g = ng.price_scale_free(m, nodes=100, reciprocity=r)
E = g.edge_nb()
Er = 2 * r * E / (1 + r)
rmin = (Er - 4*np.sqrt(Er)) / E
rmax = (Er + 4*np.sqrt(Er)) / E
assert rmin < na.reciprocity(g) < rmax
@pytest.mark.mpi_skip
def test_connect_switch_distance_rule_max_proba():
num_omp = nngt.get_config("omp")
mthread = nngt.get_config("multithreading")
# switch multithreading to False
nngt.set_config("multithreading", False)
pop = nngt.NeuralPop.exc_and_inhib(1000)
radius = 100.
shape = nngt.geometry.Shape.disk(radius)
net = nngt.SpatialNetwork(population=pop, shape=shape)
max_proba = 0.1
avg, std = 10., 1.5
weights = {"distribution": "gaussian", "avg": avg, "std": std}
ng.connect_nodes(net, pop.inhibitory, pop.excitatory, "distance_rule",
scale=5*radius, max_proba=max_proba, weights=weights)
assert net.edge_nb() <= len(pop.inhibitory)*len(pop.excitatory)*max_proba
# check weights
ww = net.get_weights()
assert avg - 0.5*std < ww.mean() < avg + 0.5*std
assert 0.75*std < ww.std() < 1.25*std
# restore mt parameters
nngt.set_config("mpi", False)
nngt.set_config("omp", num_omp)
nngt.set_config("multithreading", mthread)
@pytest.mark.mpi_skip
def test_circular():
''' Test the circular graph generation methods. '''
num_nodes = 1000
coord_nb = 4
# undirected
gc = ng.circular(coord_nb, nodes=num_nodes, directed=False)
assert gc.node_nb() == num_nodes
assert gc.edge_nb() == int(0.5*num_nodes*coord_nb)
assert np.array_equal(gc.get_degrees(), np.full(num_nodes, coord_nb))
# directed (reciprocity one)
gc = ng.circular(coord_nb, nodes=num_nodes)
assert gc.node_nb() == num_nodes
assert gc.edge_nb() == int(num_nodes*coord_nb)
assert np.array_equal(gc.get_degrees(), np.full(num_nodes, 2*coord_nb))
assert np.array_equal(gc.get_degrees("in"), np.full(num_nodes, coord_nb))
assert np.array_equal(gc.get_degrees("out"), np.full(num_nodes, coord_nb))
# directed reciprocity = 0.5
recip = 0.5
gc = ng.circular(coord_nb, nodes=num_nodes, reciprocity=recip)
num_edges = int(np.round(
0.5 * (1 + recip / (2 - recip)) * num_nodes * coord_nb))
num_recip = 2 * int(np.round(
0.5 * recip / (2 - recip) * num_nodes * coord_nb))
assert gc.node_nb() == num_nodes
assert gc.edge_nb() == num_edges
assert np.isclose(na.reciprocity(gc), num_recip / num_edges)
assert np.isclose(na.reciprocity(gc), recip, 1e-3)
# directed reciprocity = 0.5
recip = 0.3
gc = ng.circular(coord_nb, nodes=num_nodes, reciprocity=recip)
num_edges = int(np.round(
0.5 * (1 + recip / (2 - recip)) * num_nodes * coord_nb))
num_recip = 2 * int(np.round(
0.5 * recip / (2 - recip) * num_nodes * coord_nb))
assert gc.node_nb() == num_nodes
assert gc.edge_nb() == num_edges
assert np.isclose(na.reciprocity(gc), num_recip / num_edges)
assert np.isclose(na.reciprocity(gc), recip, 1e-3)
@pytest.mark.mpi_skip
def test_sparse_clustered():
ccs = np.linspace(0.1, 0.9, 4)
num_nodes = 500
degrees = [10, 40]
methods = ["star-component", "sequential", "random", "central-node"]
for directed in (True, False):
# check errors
with pytest.raises(ValueError):
g = ng.sparse_clustered(0, nodes=num_nodes, avg_deg=10,
directed=directed)
with pytest.raises(RuntimeError):
g = ng.sparse_clustered(1, nodes=num_nodes, avg_deg=10,
directed=directed, rtol=1e-10)
# check graphs
for i, c in enumerate(ccs):
for deg in degrees:
if c*num_nodes > deg:
g = ng.sparse_clustered(
c, nodes=num_nodes, avg_deg=deg, connected=False,
directed=directed, rtol=0.11)
g = ng.sparse_clustered(
c, nodes=num_nodes, avg_deg=deg, directed=directed,
exact_edge_nb=True)
assert g.edge_nb() == deg*num_nodes
g = ng.sparse_clustered(
c, nodes=num_nodes, avg_deg=deg, directed=directed,
connected=True, method=methods[i])
assert g.is_connected()
if __name__ == "__main__":
if not nngt.get_config("mpi"):
test_circular()
test_newman_watts()
test_from_degree_list()
test_total_undirected_connectivities()
test_watts_strogatz()
test_all_to_all()
test_distances()
test_price()
test_connect_switch_distance_rule_max_proba()
test_sparse_clustered()
if nngt.get_config("mpi"):
test_mpi_from_degree_list()