461 lines
14 KiB
Python
Executable File
461 lines
14 KiB
Python
Executable File
# -*- coding: utf-8 -*-
|
|
# SPDX-FileCopyrightText: 2015-2023 Tanguy Fardet
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
# testing/test_attributes.py
|
|
|
|
|
|
"""
|
|
Test the validity of graph, node, and edge attributes as well as the
|
|
distribution generators.
|
|
"""
|
|
|
|
import os
|
|
import unittest
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
import nngt
|
|
|
|
from base_test import TestBasis, XmlHandler, network_dir
|
|
from tools_testing import foreach_graph
|
|
|
|
|
|
# ---------- #
|
|
# Test tools #
|
|
# ---------- #
|
|
|
|
def _results_theo(instruct):
|
|
di_param = instruct["weights"]
|
|
if (di_param["distribution"] == "uniform"
|
|
or "corr" in di_param["distribution"]):
|
|
return di_param["lower"], di_param["upper"]
|
|
elif di_param["distribution"] == "gaussian":
|
|
return di_param["avg"], di_param["std"]
|
|
elif di_param["distribution"] == "lognormal":
|
|
return di_param["position"], di_param["scale"]
|
|
else:
|
|
raise NotImplementedError("This distribution is not supported yet.")
|
|
|
|
|
|
def _results_exp(attrib, instruct):
|
|
di_param = instruct["weights"]
|
|
if (di_param["distribution"] == "uniform"
|
|
or "corr" in di_param["distribution"]):
|
|
return attrib.min(), attrib.max()
|
|
elif di_param["distribution"] == "gaussian":
|
|
return np.average(attrib), np.std(attrib)
|
|
elif di_param["distribution"] == "lognormal":
|
|
m = np.average(attrib)
|
|
v = np.var(attrib)
|
|
return np.log(m/np.sqrt(1+v/m**2)), np.sqrt(np.log(1+v/m**2))
|
|
else:
|
|
raise NotImplementedError("This distribution is not supported yet.")
|
|
|
|
|
|
# ---------- #
|
|
# Test class #
|
|
# ---------- #
|
|
|
|
class TestAttributes(TestBasis):
|
|
|
|
'''
|
|
Class testing the main methods of the :mod:`~nngt.generation` module.
|
|
'''
|
|
|
|
tolerance = 0.02
|
|
|
|
@property
|
|
def test_name(self):
|
|
return "test_attributes"
|
|
|
|
@unittest.skipIf(nngt.get_config('mpi'), 'Not checking for MPI')
|
|
def gen_graph(self, graph_name):
|
|
di_instructions = self.parser.get_graph_options(graph_name)
|
|
graph = nngt.generate(di_instructions)
|
|
graph.set_name(graph_name)
|
|
return graph, di_instructions
|
|
|
|
def test_node_attr(self):
|
|
'''
|
|
When generating graphs with weights, check that the expected properties
|
|
are indeed obtained.
|
|
'''
|
|
g = nngt.Graph(100)
|
|
ref_result = np.random.uniform(-1, 4, g.node_nb())
|
|
g.set_node_attribute("nud", values=ref_result, value_type="double")
|
|
computed_result = g.get_node_attributes(name="nud")
|
|
self.assertTrue(np.allclose(ref_result, computed_result),
|
|
'''Error on graph {}: unequal 'nud' attribute for tolerance {}.
|
|
'''.format(g.name, self.tolerance))
|
|
|
|
def test_nattr_default_values(self):
|
|
g2 = nngt.Graph()
|
|
|
|
# add a new node with attributes
|
|
attributes = {
|
|
'size': 2.,
|
|
'color': 'blue',
|
|
'a': 5,
|
|
'blob': []
|
|
}
|
|
|
|
attribute_types = {
|
|
'size': 'double',
|
|
'color': 'string',
|
|
'a': 'int',
|
|
'blob': 'object'
|
|
}
|
|
|
|
g2.new_node(attributes=attributes, value_types=attribute_types)
|
|
g2.new_node(2)
|
|
g2.new_node(3, attributes={'size': [4., 5., 1.],
|
|
'color': ['r', 'g', 'b']},
|
|
value_types={'size': 'double', 'color': 'string'})
|
|
|
|
# check all values
|
|
# for the doubles:
|
|
# NaN == NaN is false, so we need to check separately equality between
|
|
# non-NaN entries and position of NaN entries
|
|
double_res = np.array([2., np.NaN, np.NaN, 4., 5., 1.])
|
|
isnan1 = np.isnan(g2.node_attributes['size'])
|
|
isnan2 = np.isnan(double_res)
|
|
self.assertTrue(np.all(isnan1 == isnan2))
|
|
self.assertTrue(
|
|
np.all(np.isclose(
|
|
g2.node_attributes['size'][~isnan1], double_res[~isnan2]))
|
|
)
|
|
# for the others, just compare the lists
|
|
self.assertEqual(
|
|
g2.node_attributes['color'].tolist(),
|
|
['blue', '', '', 'r', 'g', 'b'])
|
|
self.assertEqual(
|
|
g2.node_attributes['a'].tolist(), [5, 0, 0, 0, 0, 0])
|
|
self.assertEqual(
|
|
g2.node_attributes['blob'].tolist(),
|
|
[[], None, None, None, None, None])
|
|
|
|
def test_user_defined(self):
|
|
'''
|
|
When generating graphs with weights, check that the expected properties
|
|
are indeed obtained.
|
|
'''
|
|
avg = 50
|
|
std = 6
|
|
g = nngt.generation.gaussian_degree(avg, std, nodes=200)
|
|
|
|
ref_result = np.random.uniform(0, 5, g.edge_nb())
|
|
g.set_edge_attribute("ud", values=ref_result, value_type="double")
|
|
|
|
computed_result = g.get_edge_attributes(name="ud")
|
|
|
|
self.assertTrue(np.allclose(ref_result, computed_result),
|
|
'''Error on graph {}: unequal 'ud' attribute for tolerance {}.
|
|
'''.format(g.name, self.tolerance))
|
|
|
|
def test_user_defined2(self):
|
|
'''
|
|
When generating graphs with weights, check that the expected properties
|
|
are indeed obtained.
|
|
'''
|
|
avg = 50
|
|
std = 6
|
|
g = nngt.generation.gaussian_degree(avg, std, nodes=200)
|
|
|
|
ref_result = np.full(g.edge_nb(), 4.)
|
|
g.set_edge_attribute("ud2", val=4., value_type="double")
|
|
|
|
computed_result = g.get_edge_attributes(name="ud2")
|
|
self.assertTrue(np.allclose(ref_result, computed_result),
|
|
'''Error on graph {}: unequal 'ud2' attribute for tolerance {}.
|
|
'''.format(g.name, self.tolerance))
|
|
|
|
@unittest.skipIf(nngt.get_config('mpi'), 'Not checking for MPI')
|
|
def test_list_attributes(self):
|
|
'''
|
|
For list attributes, test that they are preserved as lists, and that
|
|
some nodes or edges do not own references to the same list.
|
|
'''
|
|
avg = 25
|
|
std = 3
|
|
|
|
graph = nngt.generation.gaussian_degree(avg, std, nodes=1000)
|
|
|
|
# --------------- #
|
|
# node attributes #
|
|
# --------------- #
|
|
|
|
graph.new_node_attribute("nlist", value_type="object", val=[])
|
|
|
|
nodes = [i for i in range(8, 49)]
|
|
graph.set_node_attribute("nlist", val=[1], nodes=nodes)
|
|
|
|
# update a fraction of the previously updated nodes
|
|
nodes = [i for i in range(0, 41)]
|
|
|
|
# to update the values, we need to get them to update the lists
|
|
nlists = graph.get_node_attributes(name="nlist", nodes=nodes)
|
|
|
|
for l in nlists:
|
|
l.append(2)
|
|
|
|
graph.set_node_attribute("nlist", values=nlists, nodes=nodes)
|
|
|
|
# check that all lists are present
|
|
nlists = graph.get_node_attributes(name="nlist")
|
|
|
|
res = np.unique(np.array([[], [1], [2], [1, 2]], dtype=object))
|
|
|
|
self.assertTrue(np.all(np.unique(nlists) == res))
|
|
|
|
# check that all nodes from 0 to 48 were updated
|
|
self.assertTrue([] not in nlists[:49].tolist())
|
|
|
|
# --------------- #
|
|
# edge attributes #
|
|
# --------------- #
|
|
|
|
graph.new_edge_attribute("elist", value_type="object", val=[])
|
|
|
|
nodes = list(range(8, 49))
|
|
edges = graph.get_edges(source_node=nodes, target_node=nodes)
|
|
graph.set_edge_attribute("elist", val=[1], edges=edges)
|
|
|
|
# update a fraction of the previously updated nodes
|
|
nodes = list(range(0, 41))
|
|
edges2 = graph.get_edges(source_node=nodes, target_node=nodes)
|
|
|
|
# to update the values, we need to get them to update the lists
|
|
elists = graph.get_edge_attributes(name="elist", edges=edges2)
|
|
|
|
for l in elists:
|
|
l.append(2)
|
|
|
|
graph.set_edge_attribute("elist", values=elists, edges=edges2)
|
|
|
|
# check that all lists are present
|
|
elists = graph.get_edge_attributes(name="elist")
|
|
|
|
res = np.unique(np.array([[], [1], [2], [1, 2]], dtype=object))
|
|
|
|
self.assertTrue(np.all(np.unique(elists) == res))
|
|
|
|
# check that all edges where updated
|
|
eattr1 = graph.get_edge_attributes(name="elist", edges=edges).tolist()
|
|
eattr2 = graph.get_edge_attributes(name="elist", edges=edges2).tolist()
|
|
|
|
self.assertTrue([] not in eattr1 and [] not in eattr2)
|
|
|
|
@foreach_graph
|
|
def test_weights(self, graph, instructions, **kwargs):
|
|
'''
|
|
When generating graphs with weights, check that the expected properties
|
|
are indeed obtained.
|
|
'''
|
|
ref_result = _results_theo(instructions)
|
|
|
|
weights = graph.get_weights()
|
|
computed_result = _results_exp(weights, instructions)
|
|
|
|
self.assertTrue(np.allclose(
|
|
ref_result, computed_result, self.tolerance),
|
|
'''Error on graph {}: unequal weights for tolerance {}.
|
|
'''.format(graph.name, self.tolerance))
|
|
|
|
@foreach_graph
|
|
def test_delays(self, graph, instructions, **kwargs):
|
|
'''
|
|
Test entirely run only if NEST is present on the computer.
|
|
Check that delay distribution generated in NNGT, then in NEST, is
|
|
conform to what was instructed.
|
|
'''
|
|
# get the informations from the weights
|
|
di_distrib = instructions["weights"]
|
|
distrib = di_distrib["distribution"]
|
|
delays = graph.set_delays(distribution=distrib, parameters=di_distrib)
|
|
ref_result = _results_theo(instructions)
|
|
computed_result = _results_exp(delays, instructions)
|
|
self.assertTrue(np.allclose(
|
|
ref_result, computed_result, self.tolerance),
|
|
'''Error on graph {}: unequal delays for tolerance {}.
|
|
'''.format(graph.name, self.tolerance))
|
|
# @todo
|
|
#~ if nngt._config['with_nest']:
|
|
#~ from nngt.simulation import make_nest_network
|
|
#~ gids = make_nest_network(graph)
|
|
|
|
|
|
# ---------------------- #
|
|
# Pytest formatted tests #
|
|
# ---------------------- #
|
|
|
|
@pytest.mark.mpi_skip
|
|
def test_str_attr():
|
|
''' Check string attributes '''
|
|
g = nngt.Graph(5)
|
|
|
|
# set node attribute
|
|
node_names = ["aa", "b", "c", "dddd", "eee"]
|
|
|
|
g.new_node_attribute("name", "string", values=node_names)
|
|
|
|
# set edges
|
|
edges = [(0, 1), (1, 3), (1, 4), (2, 0), (3, 2), (4, 1)]
|
|
|
|
g.new_edges(edges)
|
|
|
|
# set edge attribute
|
|
eattr = ["a"*i for i in range(len(edges))]
|
|
|
|
g.new_edge_attribute("edata", "string", values=eattr)
|
|
|
|
# check attributes
|
|
assert list(g.node_attributes["name"]) == node_names
|
|
assert list(g.edge_attributes["edata"]) == eattr
|
|
|
|
# save and load string attributes
|
|
current_dir = os.path.dirname(os.path.abspath(__file__)) + '/'
|
|
|
|
filename = current_dir + "g.el"
|
|
|
|
g.to_file(filename)
|
|
|
|
h = nngt.load_from_file(filename)
|
|
|
|
assert list(h.node_attributes["name"]) == node_names
|
|
assert list(h.edge_attributes["edata"]) == eattr
|
|
|
|
os.remove(filename)
|
|
|
|
# change an attribute
|
|
node_names[2] = "cc"
|
|
h.set_node_attribute("name", values=node_names)
|
|
|
|
assert not np.array_equal(h.node_attributes["name"],
|
|
g.node_attributes["name"])
|
|
|
|
assert list(h.node_attributes["name"]) == node_names
|
|
|
|
eattr[0] = "l"
|
|
h.set_edge_attribute("edata", values=eattr)
|
|
|
|
assert not np.array_equal(h.edge_attributes["edata"],
|
|
g.edge_attributes["edata"])
|
|
|
|
assert list(h.edge_attributes["edata"]) == eattr
|
|
|
|
|
|
@pytest.mark.mpi_skip
|
|
def test_delays():
|
|
dmin = 1.
|
|
dmax = 8.
|
|
|
|
d = {
|
|
"distribution": "lin_corr", "correl_attribute": "distance",
|
|
"lower": dmin, "upper": dmax
|
|
}
|
|
|
|
g = nngt.generation.distance_rule(200., nodes=100, avg_deg=10, delays=d)
|
|
|
|
delays = g.get_delays()
|
|
|
|
assert np.isclose(delays.min(), dmin)
|
|
assert np.isclose(delays.max(), dmax)
|
|
|
|
distances = g.edge_attributes["distance"]
|
|
|
|
slope = 0.003
|
|
|
|
g.set_delays(dmin + slope*distances)
|
|
|
|
delays = g.get_delays()
|
|
|
|
assert np.all(np.isclose(delays, dmin + slope*distances))
|
|
|
|
|
|
@pytest.mark.mpi_skip
|
|
def test_attributes_are_copied():
|
|
''' Check that the attributes returned are a copy '''
|
|
rng = np.random.default_rng()
|
|
|
|
nnodes = 100
|
|
nedges = 1000
|
|
|
|
wghts = rng.uniform(0, 5, nedges)
|
|
|
|
g = nngt.generation.erdos_renyi(nodes=nnodes, edges=nedges, weights=wghts)
|
|
|
|
# check weights
|
|
ww = g.get_weights()
|
|
|
|
assert np.all(np.isclose(wghts, ww))
|
|
|
|
rng.shuffle(ww)
|
|
|
|
assert np.all(np.isclose(wghts, g.get_weights()))
|
|
assert not np.all(np.isclose(ww, g.get_weights()))
|
|
|
|
# check edge attribute
|
|
g.new_edge_attribute("etest", "double", values=2*ww)
|
|
|
|
etest = g.edge_attributes["etest"]
|
|
|
|
assert np.all(np.isclose(etest, 2*ww))
|
|
|
|
rng.shuffle(etest)
|
|
|
|
assert np.all(np.isclose(2*ww, g.edge_attributes["etest"]))
|
|
assert not np.all(np.isclose(2*ww, etest))
|
|
|
|
# check node attribute
|
|
vv = rng.uniform(2, 3, nnodes)
|
|
|
|
g.new_node_attribute("ntest", "double", values=vv)
|
|
|
|
ntest = g.node_attributes["ntest"]
|
|
|
|
assert np.all(np.isclose(ntest, vv))
|
|
|
|
rng.shuffle(ntest)
|
|
|
|
assert np.all(np.isclose(vv, g.node_attributes["ntest"]))
|
|
assert not np.all(np.isclose(vv, ntest))
|
|
|
|
|
|
@pytest.mark.mpi_skip
|
|
def test_combined_attr():
|
|
''' Check combined attributes '''
|
|
g = nngt.Graph(3)
|
|
g.new_edges(((0, 1), (1, 1), (1, 2), (2, 1)), check_self_loops=False)
|
|
|
|
ww = (0.1, 1, 0.5, 0.2)
|
|
dd = (2., 0., 0.3, 0.3)
|
|
rr = (0.8, 0.4, -0.5, 0.5)
|
|
|
|
g.set_weights(ww)
|
|
g.new_edge_attribute("distance", "double", dd)
|
|
g.new_edge_attribute("rnd", "double", rr)
|
|
|
|
combine = {"weight": "max", "distance": "mean", "rnd": "sum"}
|
|
|
|
u = g.to_undirected(combine)
|
|
|
|
assert np.all(u.get_weights() == (0.1, 1, 0.5))
|
|
assert np.all(u.edge_attributes["distance"] == (2, 0, 0.3))
|
|
assert np.all(u.edge_attributes["rnd"] == (0.8, 0.4, 0))
|
|
|
|
|
|
# ---------- #
|
|
# Test suite #
|
|
# ---------- #
|
|
|
|
if not nngt.get_config('mpi'):
|
|
suite = unittest.TestLoader().loadTestsFromTestCase(TestAttributes)
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|
|
test_str_attr()
|
|
test_delays()
|
|
test_attributes_are_copied()
|
|
test_combined_attr()
|