|
""" |
|
Utility functions for |
|
|
|
- building and importing modules on test time, using a temporary location |
|
- detecting if compilers are present |
|
- determining paths to tests |
|
|
|
""" |
|
import glob |
|
import os |
|
import sys |
|
import subprocess |
|
import tempfile |
|
import shutil |
|
import atexit |
|
import textwrap |
|
import re |
|
import pytest |
|
import contextlib |
|
import numpy |
|
import concurrent.futures |
|
|
|
from pathlib import Path |
|
from numpy._utils import asunicode |
|
from numpy.testing import temppath, IS_WASM |
|
from importlib import import_module |
|
from numpy.f2py._backends._meson import MesonBackend |
|
|
|
|
|
|
|
|
|
|
|
def check_language(lang, code_snippet=None): |
|
if sys.platform == "win32": |
|
pytest.skip("No Fortran tests on Windows (Issue #25134)", allow_module_level=True) |
|
tmpdir = tempfile.mkdtemp() |
|
try: |
|
meson_file = os.path.join(tmpdir, "meson.build") |
|
with open(meson_file, "w") as f: |
|
f.write("project('check_compilers')\n") |
|
f.write(f"add_languages('{lang}')\n") |
|
if code_snippet: |
|
f.write(f"{lang}_compiler = meson.get_compiler('{lang}')\n") |
|
f.write(f"{lang}_code = '''{code_snippet}'''\n") |
|
f.write( |
|
f"_have_{lang}_feature =" |
|
f"{lang}_compiler.compiles({lang}_code," |
|
f" name: '{lang} feature check')\n" |
|
) |
|
try: |
|
runmeson = subprocess.run( |
|
["meson", "setup", "btmp"], |
|
check=False, |
|
cwd=tmpdir, |
|
capture_output=True, |
|
) |
|
except subprocess.CalledProcessError: |
|
pytest.skip("meson not present, skipping compiler dependent test", allow_module_level=True) |
|
return runmeson.returncode == 0 |
|
finally: |
|
shutil.rmtree(tmpdir) |
|
return False |
|
|
|
|
|
fortran77_code = ''' |
|
C Example Fortran 77 code |
|
PROGRAM HELLO |
|
PRINT *, 'Hello, Fortran 77!' |
|
END |
|
''' |
|
|
|
fortran90_code = ''' |
|
! Example Fortran 90 code |
|
program hello90 |
|
type :: greeting |
|
character(len=20) :: text |
|
end type greeting |
|
|
|
type(greeting) :: greet |
|
greet%text = 'hello, fortran 90!' |
|
print *, greet%text |
|
end program hello90 |
|
''' |
|
|
|
|
|
class CompilerChecker: |
|
def __init__(self): |
|
self.compilers_checked = False |
|
self.has_c = False |
|
self.has_f77 = False |
|
self.has_f90 = False |
|
|
|
def check_compilers(self): |
|
if (not self.compilers_checked) and (not sys.platform == "cygwin"): |
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
futures = [ |
|
executor.submit(check_language, "c"), |
|
executor.submit(check_language, "fortran", fortran77_code), |
|
executor.submit(check_language, "fortran", fortran90_code) |
|
] |
|
|
|
self.has_c = futures[0].result() |
|
self.has_f77 = futures[1].result() |
|
self.has_f90 = futures[2].result() |
|
|
|
self.compilers_checked = True |
|
|
|
if not IS_WASM: |
|
checker = CompilerChecker() |
|
checker.check_compilers() |
|
|
|
def has_c_compiler(): |
|
return checker.has_c |
|
|
|
def has_f77_compiler(): |
|
return checker.has_f77 |
|
|
|
def has_f90_compiler(): |
|
return checker.has_f90 |
|
|
|
def has_fortran_compiler(): |
|
return (checker.has_f90 and checker.has_f77) |
|
|
|
|
|
|
|
|
|
|
|
|
|
_module_dir = None |
|
_module_num = 5403 |
|
|
|
if sys.platform == "cygwin": |
|
NUMPY_INSTALL_ROOT = Path(__file__).parent.parent.parent |
|
_module_list = list(NUMPY_INSTALL_ROOT.glob("**/*.dll")) |
|
|
|
|
|
def _cleanup(): |
|
global _module_dir |
|
if _module_dir is not None: |
|
try: |
|
sys.path.remove(_module_dir) |
|
except ValueError: |
|
pass |
|
try: |
|
shutil.rmtree(_module_dir) |
|
except OSError: |
|
pass |
|
_module_dir = None |
|
|
|
|
|
def get_module_dir(): |
|
global _module_dir |
|
if _module_dir is None: |
|
_module_dir = tempfile.mkdtemp() |
|
atexit.register(_cleanup) |
|
if _module_dir not in sys.path: |
|
sys.path.insert(0, _module_dir) |
|
return _module_dir |
|
|
|
|
|
def get_temp_module_name(): |
|
|
|
global _module_num |
|
get_module_dir() |
|
name = "_test_ext_module_%d" % _module_num |
|
_module_num += 1 |
|
if name in sys.modules: |
|
|
|
raise RuntimeError("Temporary module name already in use.") |
|
return name |
|
|
|
|
|
def _memoize(func): |
|
memo = {} |
|
|
|
def wrapper(*a, **kw): |
|
key = repr((a, kw)) |
|
if key not in memo: |
|
try: |
|
memo[key] = func(*a, **kw) |
|
except Exception as e: |
|
memo[key] = e |
|
raise |
|
ret = memo[key] |
|
if isinstance(ret, Exception): |
|
raise ret |
|
return ret |
|
|
|
wrapper.__name__ = func.__name__ |
|
return wrapper |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@_memoize |
|
def build_module(source_files, options=[], skip=[], only=[], module_name=None): |
|
""" |
|
Compile and import a f2py module, built from the given files. |
|
|
|
""" |
|
|
|
code = f"import sys; sys.path = {sys.path!r}; import numpy.f2py; numpy.f2py.main()" |
|
|
|
d = get_module_dir() |
|
|
|
if not has_fortran_compiler(): |
|
pytest.skip("No Fortran compiler available") |
|
|
|
|
|
dst_sources = [] |
|
f2py_sources = [] |
|
for fn in source_files: |
|
if not os.path.isfile(fn): |
|
raise RuntimeError("%s is not a file" % fn) |
|
dst = os.path.join(d, os.path.basename(fn)) |
|
shutil.copyfile(fn, dst) |
|
dst_sources.append(dst) |
|
|
|
base, ext = os.path.splitext(dst) |
|
if ext in (".f90", ".f95", ".f", ".c", ".pyf"): |
|
f2py_sources.append(dst) |
|
|
|
assert f2py_sources |
|
|
|
|
|
if module_name is None: |
|
module_name = get_temp_module_name() |
|
gil_options = [] |
|
if '--freethreading-compatible' not in options and '--no-freethreading-compatible' not in options: |
|
|
|
gil_options = ['--freethreading-compatible'] |
|
f2py_opts = ["-c", "-m", module_name] + options + gil_options + f2py_sources |
|
f2py_opts += ["--backend", "meson"] |
|
if skip: |
|
f2py_opts += ["skip:"] + skip |
|
if only: |
|
f2py_opts += ["only:"] + only |
|
|
|
|
|
cwd = os.getcwd() |
|
try: |
|
os.chdir(d) |
|
cmd = [sys.executable, "-c", code] + f2py_opts |
|
p = subprocess.Popen(cmd, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT) |
|
out, err = p.communicate() |
|
if p.returncode != 0: |
|
raise RuntimeError("Running f2py failed: %s\n%s" % |
|
(cmd[4:], asunicode(out))) |
|
finally: |
|
os.chdir(cwd) |
|
|
|
|
|
for fn in dst_sources: |
|
os.unlink(fn) |
|
|
|
|
|
if sys.platform == "cygwin": |
|
|
|
|
|
|
|
_module_list.extend( |
|
glob.glob(os.path.join(d, "{:s}*".format(module_name))) |
|
) |
|
subprocess.check_call( |
|
["/usr/bin/rebase", "--database", "--oblivious", "--verbose"] |
|
+ _module_list |
|
) |
|
|
|
|
|
return import_module(module_name) |
|
|
|
|
|
@_memoize |
|
def build_code(source_code, |
|
options=[], |
|
skip=[], |
|
only=[], |
|
suffix=None, |
|
module_name=None): |
|
""" |
|
Compile and import Fortran code using f2py. |
|
|
|
""" |
|
if suffix is None: |
|
suffix = ".f" |
|
with temppath(suffix=suffix) as path: |
|
with open(path, "w") as f: |
|
f.write(source_code) |
|
return build_module([path], |
|
options=options, |
|
skip=skip, |
|
only=only, |
|
module_name=module_name) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SimplifiedMesonBackend(MesonBackend): |
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
|
|
def compile(self): |
|
self.write_meson_build(self.build_dir) |
|
self.run_meson(self.build_dir) |
|
|
|
|
|
def build_meson(source_files, module_name=None, **kwargs): |
|
""" |
|
Build a module via Meson and import it. |
|
""" |
|
|
|
|
|
if not has_fortran_compiler(): |
|
pytest.skip("No Fortran compiler available") |
|
|
|
build_dir = get_module_dir() |
|
if module_name is None: |
|
module_name = get_temp_module_name() |
|
|
|
|
|
backend = SimplifiedMesonBackend( |
|
modulename=module_name, |
|
sources=source_files, |
|
extra_objects=kwargs.get("extra_objects", []), |
|
build_dir=build_dir, |
|
include_dirs=kwargs.get("include_dirs", []), |
|
library_dirs=kwargs.get("library_dirs", []), |
|
libraries=kwargs.get("libraries", []), |
|
define_macros=kwargs.get("define_macros", []), |
|
undef_macros=kwargs.get("undef_macros", []), |
|
f2py_flags=kwargs.get("f2py_flags", []), |
|
sysinfo_flags=kwargs.get("sysinfo_flags", []), |
|
fc_flags=kwargs.get("fc_flags", []), |
|
flib_flags=kwargs.get("flib_flags", []), |
|
setup_flags=kwargs.get("setup_flags", []), |
|
remove_build_dir=kwargs.get("remove_build_dir", False), |
|
extra_dat=kwargs.get("extra_dat", {}), |
|
) |
|
|
|
backend.compile() |
|
|
|
|
|
sys.path.insert(0, f"{build_dir}/{backend.meson_build_dir}") |
|
return import_module(module_name) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class F2PyTest: |
|
code = None |
|
sources = None |
|
options = [] |
|
skip = [] |
|
only = [] |
|
suffix = ".f" |
|
module = None |
|
_has_c_compiler = None |
|
_has_f77_compiler = None |
|
_has_f90_compiler = None |
|
|
|
@property |
|
def module_name(self): |
|
cls = type(self) |
|
return f'_{cls.__module__.rsplit(".",1)[-1]}_{cls.__name__}_ext_module' |
|
|
|
@classmethod |
|
def setup_class(cls): |
|
if sys.platform == "win32": |
|
pytest.skip("Fails with MinGW64 Gfortran (Issue #9673)") |
|
F2PyTest._has_c_compiler = has_c_compiler() |
|
F2PyTest._has_f77_compiler = has_f77_compiler() |
|
F2PyTest._has_f90_compiler = has_f90_compiler() |
|
F2PyTest._has_fortran_compiler = has_fortran_compiler() |
|
|
|
def setup_method(self): |
|
if self.module is not None: |
|
return |
|
|
|
codes = self.sources if self.sources else [] |
|
if self.code: |
|
codes.append(self.suffix) |
|
|
|
needs_f77 = any(str(fn).endswith(".f") for fn in codes) |
|
needs_f90 = any(str(fn).endswith(".f90") for fn in codes) |
|
needs_pyf = any(str(fn).endswith(".pyf") for fn in codes) |
|
|
|
if needs_f77 and not self._has_f77_compiler: |
|
pytest.skip("No Fortran 77 compiler available") |
|
if needs_f90 and not self._has_f90_compiler: |
|
pytest.skip("No Fortran 90 compiler available") |
|
if needs_pyf and not self._has_fortran_compiler: |
|
pytest.skip("No Fortran compiler available") |
|
|
|
|
|
if self.code is not None: |
|
self.module = build_code( |
|
self.code, |
|
options=self.options, |
|
skip=self.skip, |
|
only=self.only, |
|
suffix=self.suffix, |
|
module_name=self.module_name, |
|
) |
|
|
|
if self.sources is not None: |
|
self.module = build_module( |
|
self.sources, |
|
options=self.options, |
|
skip=self.skip, |
|
only=self.only, |
|
module_name=self.module_name, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def getpath(*a): |
|
|
|
d = Path(numpy.f2py.__file__).parent.resolve() |
|
return d.joinpath(*a) |
|
|
|
|
|
@contextlib.contextmanager |
|
def switchdir(path): |
|
curpath = Path.cwd() |
|
os.chdir(path) |
|
try: |
|
yield |
|
finally: |
|
os.chdir(curpath) |
|
|