blob: 62baeaa7f849052757c4b11f131da015c8c0a037 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 F4PGA Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
"""
This module is intended for wrapping simple scripts without rewriting them as
an sfbuild module. This is mostly to maintain compatibility with workflows
that do not use sfbuild and instead rely on legacy scripts.
Accepted module parameters:
* `stage_name` (string, optional): Name describing the stage
* `script` (string, mandatory): Path to the script to be executed
* `interpreter` (string, optional): Interpreter for the script
* `cwd` (string, optional): Current Working Directory for the script
* `outputs` (dict[string -> dict[string -> string]], mandatory):
A dict with output descriptions (dicts).
Keys name output dependencies.
* `mode` (string, mandatory): "file" or "stdout".
Describes how the output is grabbed from the script.
* `file` (string, required if `mode` is "file"): Name of the file generated by the script.
* `target` (string, required): Default name of the file of the generated dependency.
You can use all values available durng map_io stage.
Each input dependency alsogets two extra values associated with it:
`:dependency_name[noext]`, which contains the path to the dependency the extension with anything after last "."
removed and `:dependency_name[dir]` which contains directory paths of the dependency.
This is useful for deriving an output name from the input.
* `meta` (string, optional): Description of the output dependency.
* `inputs` (dict[string -> string | bool], mandatory):
A dict with input descriptions.
Key is either a name of a named argument or a position of unnamed argument prefaced with "#" (eg. "#1").
Positions are indexed from 1, as it's a convention that 0th argument is the path of the executed program.
Values are strings that can contains references to variables to be resolved after the project flow configuration is
loaded (that means they can reference values and dependencies which are to be set by the user).
All of modules inputs will be determined by the references used.
Thus dependency and value definitions are implicit.
If the value of the resolved string is empty and is associated with a named argument, the argument in question will be
skipped entirely.
This allows using optional dependencies.
To use a named argument as a flag instead, set it to `true`.
"""
# TODO: `environment` input kind
from pathlib import Path
from re import match as re_match, finditer as re_finditer
from f4pga.flows.common import decompose_depname, deep, get_verbosity_level, sub
from f4pga.flows.module import Module, ModuleContext
def _get_param(params, name: str):
param = params.get(name)
if not param:
raise Exception(f"generic module wrapper parameters " f"missing `{name}` field")
return param
def _parse_param_def(param_def: str):
if param_def[0] == "#":
return "positional", int(param_def[1:])
elif param_def[0] == "$":
return "environmental", param_def[1:]
elif param_def[0] == "-":
return "char", param_def[1:]
return "named", param_def
class InputReferences:
dependencies: "set[str]"
values: "set[str]"
def merge(self, other):
self.dependencies.update(other.dependencies)
self.values.update(other.values)
def __init__(self):
self.dependencies = set()
self.values = set()
def _get_input_references(input: str) -> InputReferences:
refs = InputReferences()
if type(input) is not str:
return refs
for match in re_finditer("\$\{([^${}]*)\}", input):
match_str = match.group(1)
if match_str[0] != ":":
refs.values.add(match_str)
continue
if len(match_str) < 2:
raise Exception("Dependency name must be at least 1 character long")
refs.dependencies.add(re_match("([^\\[\\]]*)", match_str[1:]).group(1))
return refs
def _make_noop1():
def noop(_):
return
return noop
def _tailcall1(self, fun):
def newself(arg, self=self, fun=fun):
fun(arg)
self(arg)
return newself
class GenericScriptWrapperModule(Module):
script_path: str
stdout_target: "None | tuple[str, str]"
file_outputs: "list[tuple[str, str, str]]"
interpreter: "None | str"
cwd: "None | str"
@staticmethod
def _add_extra_values_to_env(ctx: ModuleContext):
for take_name, take_path in vars(ctx.takes).items():
if take_path is not None:
ctx.r_env.values[f":{take_name}[noext]"] = deep(lambda p: str(Path(p).with_suffix("")))(take_path)
ctx.r_env.values[f":{take_name}[dir]"] = deep(lambda p: str(Path(p).parent.resolve()))(take_path)
def map_io(self, ctx: ModuleContext):
self._add_extra_values_to_env(ctx)
outputs = {}
for dep, _, out_path in self.file_outputs:
out_path_resolved = ctx.r_env.resolve(out_path, final=True)
outputs[dep] = out_path_resolved
if self.stdout_target:
out_path_resolved = ctx.r_env.resolve(self.stdout_target[1], final=True)
outputs[self.stdout_target[0]] = out_path_resolved
return outputs
def execute(self, ctx: ModuleContext):
self._add_extra_values_to_env(ctx)
cwd = ctx.r_env.resolve(self.cwd)
sub_args = (
([ctx.r_env.resolve(self.interpreter, final=True)] if self.interpreter else [])
+ (
self.script_path
if isinstance(self.script_path, list)
else [ctx.r_env.resolve(self.script_path, final=True)]
)
+ self.get_args(ctx)
)
sub_env = self.get_env(ctx)
# XXX: This may produce incorrect string if arguments contains whitespace
# characters
cmd = " ".join(sub_args)
if get_verbosity_level() >= 2:
yield f"Running script...\n {cmd}"
else:
yield f"Running an externel script..."
data = sub(*sub_args, cwd=cwd, env=sub_env)
yield "Writing outputs..."
if self.stdout_target:
target = ctx.r_env.resolve(self.stdout_target[1], final=True)
with open(target, "wb") as f:
f.write(data)
for dep, file, _ in self.file_outputs:
file = ctx.r_env.resolve(file, final=True)
target = ctx.r_env.resolve(getattr(ctx.outputs, dep), final=True)
print(f"file is: {file}, target is: {target}")
if target != file:
Path(file).rename(target)
def _init_outputs(self, output_defs: "dict[str, dict[str, str]]"):
self.stdout_target = None
self.file_outputs = []
for dep_name, output_def in output_defs.items():
dname, _ = decompose_depname(dep_name)
self.produces.append(dep_name)
meta = output_def.get("meta")
if meta is str:
self.prod_meta[dname] = meta
mode = output_def.get("mode")
if type(mode) is not str:
raise Exception(f"Output mode for `{dep_name}` is not specified")
target = output_def.get("target")
if type(target) is not str:
raise Exception("`target` field is not specified")
if mode == "file":
file = output_def.get("file")
if type(file) is not str:
raise Exception("Output file is not specified")
self.file_outputs.append((dname, file, target))
elif mode == "stdout":
if self.stdout_target is not None:
raise Exception("stdout output is already specified")
self.stdout_target = dname, target
# A very functional approach
def _init_inputs(self, input_defs):
positional_args = []
named_args = []
env_vars = {}
refs = InputReferences()
get_args = _make_noop1()
get_env = _make_noop1()
for arg_code, input in input_defs.items():
param_kind, param = _parse_param_def(arg_code)
push = None
push_env = None
if param_kind == "named":
def push_named(val: "str | bool | int", param=param):
nonlocal named_args
if type(val) is bool:
named_args.append(f"--{param}")
else:
named_args += [f"--{param}", str(val)]
push = push_named
elif param_kind == "char":
def push_char(val: "str | bool | int", param=param):
nonlocal named_args
if type(val) is bool:
named_args.append(f"-{param}")
else:
named_args += [f"-{param}", str(val)]
push = push_char
elif param_kind == "environmental":
def push_environ(val: "str | bool | int", param=param):
nonlocal env_vars
env_vars[param] = val
push_env = push_environ
else:
def push_positional(val: str, param=param):
nonlocal positional_args
positional_args.append((param, val))
push = push_positional
input_refs = _get_input_references(input)
refs.merge(input_refs)
if push is not None:
def push_q(ctx: ModuleContext, push=push, input=input):
val = ctx.r_env.resolve(input, final=True)
if val != "":
push(val)
get_args = _tailcall1(get_args, push_q)
else:
def push_q(ctx: ModuleContext, push_env=push_env, input=input):
val = ctx.r_env.resolve(input, final=True)
if val != "":
push_env(val)
get_env = _tailcall1(get_env, push_q)
def get_all_args(ctx: ModuleContext):
nonlocal get_args, positional_args, named_args
get_args(ctx)
positional_args.sort(key=lambda t: t[0])
pos = [a for _, a in positional_args]
return named_args + pos
def get_all_env(ctx: ModuleContext):
nonlocal get_env, env_vars
get_env(ctx)
if len(env_vars.items()) == 0:
return None
return env_vars
setattr(self, "get_args", get_all_args)
setattr(self, "get_env", get_all_env)
for dep in refs.dependencies:
self.takes.append(dep)
for val in refs.values:
self.values.append(val)
def __init__(self, params):
stage_name = params.get("stage_name")
self.name = f"{'<unknown>' if stage_name is None else stage_name}-generic"
self.no_of_phases = 2
self.script_path = params.get("script")
self.interpreter = params.get("interpreter")
self.cwd = params.get("cwd")
self.takes = []
self.produces = []
self.values = []
self.prod_meta = {}
self._init_outputs(_get_param(params, "outputs"))
self._init_inputs(_get_param(params, "inputs"))
ModuleClass = GenericScriptWrapperModule