344 lines
10 KiB
Python
344 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
|
# SPDX-FileCopyrightText: 2015-2023 Tanguy Fardet
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
# testing/test_io.py
|
|
|
|
# This file is part of the NNGT module
|
|
# Distributed as a free software, in the hope that it will be useful, under the
|
|
# terms of the GNU General Public License.
|
|
|
|
"""
|
|
Test the IO functions.
|
|
"""
|
|
|
|
import os
|
|
import unittest
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
import nngt
|
|
from base_test import TestBasis, XmlHandler, network_dir
|
|
from tools_testing import foreach_graph
|
|
|
|
|
|
# --------------------- #
|
|
# File creation/removal #
|
|
# --------------------- #
|
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__)) + '/'
|
|
error = 'Wrong {{val}} for {graph}.'
|
|
|
|
formats = ("neighbour", "edge_list", "gml", "graphml")
|
|
|
|
filetypes = ("nn", "el", "gml", "graphml")
|
|
|
|
gfilename = current_dir + 'g.graph'
|
|
|
|
|
|
def teardown_function(function):
|
|
''' Cleanup the file '''
|
|
if nngt.get_config("mpi"):
|
|
from mpi4py import MPI
|
|
comm = MPI.COMM_WORLD.Clone()
|
|
|
|
comm.Barrier()
|
|
|
|
try:
|
|
os.remove(gfilename)
|
|
except:
|
|
pass
|
|
|
|
|
|
# ---------- #
|
|
# Test class #
|
|
# ---------- #
|
|
|
|
class TestIO(TestBasis):
|
|
|
|
'''
|
|
Class testing saving and loading functions.
|
|
'''
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
for graphname in cls.graphs:
|
|
try:
|
|
os.remove(current_dir + graphname + '.el')
|
|
except:
|
|
pass
|
|
try:
|
|
for ft in filetypes:
|
|
os.remove(current_dir + 'test.' + ft)
|
|
except:
|
|
pass
|
|
|
|
@property
|
|
def test_name(self):
|
|
return "test_io"
|
|
|
|
@unittest.skipIf(nngt.get_config('mpi'), 'Not checking for MPI')
|
|
def gen_graph(self, graph_name):
|
|
# check whether we are loading from file
|
|
if "." in graph_name:
|
|
abspath = network_dir + graph_name
|
|
di_instructions = self.parser.get_graph_options(graph_name)
|
|
graph = nngt.Graph.from_file(abspath, **di_instructions,
|
|
cleanup=True)
|
|
graph.set_name(graph_name)
|
|
return graph, None
|
|
else:
|
|
di_instructions = self.parser.get_graph_options(graph_name)
|
|
graph = nngt.generate(di_instructions)
|
|
graph.set_name(graph_name)
|
|
graph.to_file(current_dir + graph_name + '.el')
|
|
|
|
return graph, di_instructions
|
|
|
|
@foreach_graph
|
|
def test_identical(self, graph, instructions, **kwargs):
|
|
'''
|
|
Test that the generated graph and the one loaded from the saved file
|
|
are indeed identical.
|
|
'''
|
|
err = error.format(graph=graph.name)
|
|
if instructions is not None: # working with generated graph
|
|
# load graph
|
|
h = nngt.Graph.from_file(current_dir + graph.name + '.el')
|
|
attributes = h.edge_attributes
|
|
|
|
# test properties
|
|
self.assertTrue(h.node_nb() == graph.node_nb(),
|
|
err.format(val='node number'))
|
|
self.assertTrue(h.edge_nb() == graph.edge_nb(),
|
|
err.format(val='edge number'))
|
|
|
|
if graph.is_spatial():
|
|
self.assertTrue(np.allclose(h.get_positions(),
|
|
graph.get_positions()),
|
|
err.format(val='positions'))
|
|
|
|
for attr, values in graph.edge_attributes.items():
|
|
# different results probably because of rounding problems
|
|
# note, here we are using the edge list format so edge order
|
|
# is the same in both graphs
|
|
new_val = h.get_edge_attributes(name=attr)
|
|
allclose = np.allclose(new_val, values, 1e-4)
|
|
if not allclose:
|
|
print("Error: expected")
|
|
print(values)
|
|
print("but got")
|
|
print(new_val)
|
|
print("max error is: {}".format(
|
|
np.max(np.abs(np.subtract(
|
|
new_val, values)))))
|
|
self.assertTrue(allclose, err.format(val=attr))
|
|
else: # working with loaded graph
|
|
nodes = self.get_expected_result(graph, "nodes")
|
|
edges = self.get_expected_result(graph, "edges")
|
|
directed = self.get_expected_result(graph, "directed")
|
|
# check
|
|
self.assertEqual(
|
|
nodes, graph.node_nb(), err.format(val='node number'))
|
|
self.assertEqual(
|
|
edges, graph.edge_nb(), err.format(val='edge number'))
|
|
self.assertTrue(directed == graph.is_directed(),
|
|
err.format(val='directedness'))
|
|
|
|
@unittest.skipIf(nngt.get_config('mpi'), 'Not checking for MPI')
|
|
def test_custom_attributes(self):
|
|
'''
|
|
Test that custom attributes are saved and loaded correctly
|
|
'''
|
|
num_nodes = 100
|
|
avg_deg = 10
|
|
|
|
g = nngt.Graph(nodes=num_nodes)
|
|
g.new_edge_attribute("test_attr", "int")
|
|
|
|
for i in range(num_nodes):
|
|
targets = np.random.choice(num_nodes, size=avg_deg, replace=False)
|
|
elist = np.zeros((len(targets), 2), dtype=int)
|
|
elist[:, 0] = i
|
|
elist[:, 1] = targets
|
|
ids = np.random.randint(0, avg_deg*num_nodes, len(targets))
|
|
ids *= 2*np.random.randint(0, 2, len(targets)) - 1
|
|
g.new_edges(elist, attributes={"test_attr": ids},
|
|
check_duplicates=False, check_self_loops=False,
|
|
check_existing=False)
|
|
|
|
old_edges = g.edges_array
|
|
|
|
for ft in filetypes:
|
|
g.to_file(current_dir + 'test.' + ft)
|
|
h = nngt.Graph.from_file(current_dir + 'test.' + ft)
|
|
|
|
# for neighbour list, we need to give the edge list to have
|
|
# the edge attributes in the same order as the original graph
|
|
allclose = np.allclose(g.get_edge_attributes(name="test_attr"),
|
|
h.get_edge_attributes(edges=old_edges,
|
|
name="test_attr"))
|
|
if not allclose:
|
|
print("Results differed for '{}'.".format(g.name))
|
|
print("using file 'test.{}'.".format(ft))
|
|
print(g.get_edge_attributes(name="test_attr"))
|
|
print(h.get_edge_attributes(edges=old_edges, name="test_attr"))
|
|
with open(current_dir + 'test.' + ft, 'r') as f:
|
|
for line in f.readlines():
|
|
print(line.strip())
|
|
|
|
self.assertTrue(allclose)
|
|
|
|
|
|
@pytest.mark.mpi_skip
|
|
def test_empty_out_degree():
|
|
g = nngt.Graph(2)
|
|
|
|
g.new_edge(0, 1)
|
|
|
|
for fmt in formats:
|
|
nngt.save_to_file(g, gfilename, fmt=fmt)
|
|
|
|
h = nngt.load_from_file(gfilename, fmt=fmt)
|
|
|
|
assert np.array_equal(g.edges_array, h.edges_array)
|
|
|
|
|
|
@pytest.mark.mpi_skip
|
|
def test_str_attributes():
|
|
g = nngt.Graph(2)
|
|
|
|
g.new_edge(0, 1)
|
|
|
|
g.new_edge_attribute("type", "string")
|
|
g.set_edge_attribute("type", val='odd')
|
|
|
|
g.new_node_attribute("rnd", "string")
|
|
g.set_node_attribute("rnd", values=["s'adf", 'sd fr"'])
|
|
|
|
for fmt in formats:
|
|
nngt.save_to_file(g, gfilename, fmt=fmt)
|
|
|
|
h = nngt.load_from_file(gfilename, fmt=fmt)
|
|
|
|
assert np.array_equal(g.edges_array, h.edges_array)
|
|
|
|
assert np.array_equal(g.edge_attributes["type"],
|
|
h.edge_attributes["type"])
|
|
|
|
assert np.array_equal(g.node_attributes["rnd"],
|
|
h.node_attributes["rnd"])
|
|
|
|
|
|
@pytest.mark.mpi_skip
|
|
def test_structure():
|
|
# with a structure
|
|
room1 = nngt.Group(25)
|
|
room2 = nngt.Group(50)
|
|
room3 = nngt.Group(40)
|
|
room4 = nngt.Group(35)
|
|
|
|
names = ["R1", "R2", "R3", "R4"]
|
|
|
|
struct = nngt.Structure.from_groups((room1, room2, room3, room4), names)
|
|
|
|
g = nngt.Graph(structure=struct)
|
|
|
|
for fmt in formats:
|
|
g.to_file(gfilename, fmt=fmt)
|
|
|
|
h = nngt.load_from_file(gfilename, fmt=fmt)
|
|
|
|
assert g.structure == h.structure
|
|
|
|
# with a neuronal population
|
|
g = nngt.Network.exc_and_inhib(100)
|
|
|
|
for fmt in formats:
|
|
g.to_file(gfilename, fmt=fmt)
|
|
|
|
h = nngt.load_from_file(gfilename, fmt=fmt)
|
|
|
|
assert g.population == h.population
|
|
|
|
|
|
@pytest.mark.mpi_skip
|
|
def test_spatial():
|
|
from nngt.geometry import Shape
|
|
|
|
shape = Shape.disk(100, default_properties={"plop": 0.2, "height": 1.})
|
|
area = Shape.rectangle(10, 10, default_properties={"height": 10.})
|
|
shape.add_area(area, name="center")
|
|
|
|
g = nngt.SpatialGraph(20, shape=shape)
|
|
|
|
for fmt in formats:
|
|
g.to_file(gfilename, fmt=fmt)
|
|
|
|
h = nngt.load_from_file(gfilename, fmt=fmt)
|
|
|
|
tolerance = np.average(np.abs(h.shape.exterior.coords))*1e-5
|
|
|
|
assert np.all(np.isclose(g.get_positions(), h.get_positions()))
|
|
assert g.shape.normalize().equals_exact(h.shape.normalize(), tolerance)
|
|
|
|
for name, area in g.shape.areas.items():
|
|
assert area.normalize().equals_exact(
|
|
h.shape.areas[name].normalize(), tolerance)
|
|
|
|
assert area.properties == h.shape.areas[name].properties
|
|
|
|
|
|
@pytest.mark.mpi_skip
|
|
def test_node_attributes():
|
|
num_nodes = 10
|
|
g = nngt.generation.erdos_renyi(nodes=num_nodes, avg_deg=3, directed=False)
|
|
|
|
g.new_node_attribute("size", "int", [2*(i+1) for i in range(num_nodes)])
|
|
|
|
for fmt in formats:
|
|
g.to_file(gfilename, fmt=fmt)
|
|
|
|
h = nngt.load_from_file(gfilename, fmt=fmt)
|
|
|
|
assert np.array_equal(g.node_attributes["size"],
|
|
h.node_attributes["size"])
|
|
|
|
|
|
@pytest.mark.mpi_skip
|
|
def test_partial_graphml():
|
|
'''
|
|
Check that the GraphML file loads fine if some attributes are missing.
|
|
'''
|
|
g = nngt.load_from_file(os.path.join(current_dir,
|
|
"Networks/missing_attrs.graphml"))
|
|
|
|
nattrs = {0: "A", 1: "", 2: "C", 3: "D"}
|
|
eattrs = {(0, 1): 4, (0, 2): 1, (2, 3): np.NaN, (3, 1): 0.5}
|
|
|
|
for n, name in nattrs.items():
|
|
assert g.node_attributes["name"][n] == name
|
|
|
|
for e, value in eattrs.items():
|
|
if np.isnan(value):
|
|
assert np.isnan(g.get_edge_attributes(edges=e, name="eattr"))
|
|
else:
|
|
assert g.get_edge_attributes(edges=e, name="eattr") == value
|
|
|
|
|
|
# ---------- #
|
|
# Test suite #
|
|
# ---------- #
|
|
|
|
suite = unittest.TestLoader().loadTestsFromTestCase(TestIO)
|
|
|
|
if __name__ == "__main__":
|
|
if not nngt.get_config("mpi"):
|
|
test_empty_out_degree()
|
|
test_str_attributes()
|
|
test_structure()
|
|
test_node_attributes()
|
|
test_spatial()
|
|
test_partial_graphml()
|
|
unittest.main()
|