Previous topic

ryvencore v0.5.0

Next topic

ryvencore.addons package

This Page

ryvencore package

Subpackages

Submodules

ryvencore.AddOn module

ALPHA

This module defines a simple add-on system to extend ryvencore’s functionalities. Some add-ons are provided in the addons package, and additional add-ons can be added and registered in the Session.

An add-on
  • has a name and a version

  • is session-local, not flow-local (but you can of course implement per-flow functionality)

  • manages its own state (in particular get_state() and set_state())

  • can store additional node-specific data in the node’s data dict when it’s serialized

  • will be accessible through the nodes API: self.get_addon('your_addon') in your nodes

TODO: The below statement is not true, I think. Add-ons are loaded first, and nodes can access

them during their initialization (but it may be a bad idea).

Add-on access is blocked during loading (deserialization), so nodes should not access any add-ons during the execution of Node.__init__ or Node.set_data. This prevents inconsistent states. Nodes are loaded first, then the add-ons. Therefore, the add-on should be sufficiently isolated and self-contained.

To define a custom add-on:
  • create a directory your_addons for you addons or use ryvencore’s addon directory

  • create a module for your addon YourAddon.py in your_addons

  • create a class YourAddon(ryvencore.AddOn) that defines your add-on’s functionality

  • instantiate it into a top-level variable: addon = YourAddon() at the end of the module

  • register your addon directory in the Session: session.register_addon_dir('path/to/your_addons')

See ryvencore.addons for examples.

class AddOn[source]

Bases: Base

register(session: Session)[source]

Called when the add-on is registered with a session.

connect_flow_events(flow: Flow)[source]

Connects flow events to the add-on.

on_flow_created(flow: Flow)[source]

VIRTUAL

Called when a flow is created.

on_flow_destroyed(flow: Flow)[source]

VIRTUAL

Called when a flow is destroyed.

on_node_created(node: Node)[source]

VIRTUAL

Called when a node is created and fully initialized (Node.load() has already been called, if necessary), but not yet added to the flow. Therefore, this is a good place to initialize the node with add-on-specific data.

This happens only once per node, whereas it can be added and removed multiple times, see AddOn.on_node_added() and AddOn.on_node_removed().

on_node_added(node: Node)[source]

VIRTUAL

Called when a node is added to a flow.

on_node_removed(node: Node)[source]

VIRTUAL

Called when a node is removed from a flow.

extend_node_data(node: Node, data: Dict)[source]

VIRTUAL

Invoked whenever any node is serialized. This method can be used to extend the node’s data dict with additional add-on.related data.

get_state() Dict[source]

VIRTUAL

Return the state of the add-on as JSON-compatible a dict. This dict will be extended by AddOn.complete_data().

set_state(state: dict, version: str)[source]

VIRTUAL

Set the state of the add-on from the dict generated in AddOn.get_state(). Notice that add-ons are loaded before the flows. If you need to re-establish any sort of connections between flows or nodes and your add-on, you should store state and do so in the according slot methods (e.g. on_node_added()).

data() Dict[source]

Supplements the data dict with additional data.

load(data: Dict)[source]

Loads the data dict generated in AddOn.data().

ryvencore.Data module

This file defines the Data type, which must be used to pass data between nodes. It should be subclassed to define custom data types. In particular, serialization and deserialization must be implemented for each respective type. Types that are pickle serializable by default can be used directly with :code`Data(my_data)`.

class Data(value=None, load_from=None)[source]

Bases: Base

Base class for data objects.

Subclass this class and implement serialization and deserialization accordingly to send data to other nodes. You must register your custom Data subclass with the Session.register_data() before using it (which especially applies to loading a project, custom Data subclasses used must be registered in advance).

In case of large data sets being shared, you might want to leave serialization empty, which means the graph will not enter the same state when you reload it, which is fine as long as your nodes are built appropriately e.g. such that you can quickly regenerate that state by updating the root node.

Be careful when consuming complex input data: modification can lead to undesired effects. In particular, if you share some data object \(d\) with successor nodes \(N1\) and \(N2\), and \(N1\) changes \(d\) directly, then \(N2\) will see the change as well, because they look at the same Data object:

>>> import ryvencore as rc
>>>
>>> class Producer(rc.Node):
...     init_outputs = [rc.NodeOutputType()]
...
...     def push_data(self, d):
...         self.d = d
...         self.update()
...
...     def update_event(self, inp=-1):
...         self.set_output_val(0, self.d)
>>>
>>> class Consumer(rc.Node):
...     init_inputs = [rc.NodeInputType()]
...
...     def update_event(self, inp=-1):
...         p = self.input(0).payload
...         p.append(4)
...         print(p)
>>>
>>> def build_and_run(D):
...     s = rc.Session()
...     s.register_node_type(Producer)
...     s.register_node_type(Consumer)
...     f = s.create_flow('main')
...     producer =  f.create_node(Producer)
...     consumer1 = f.create_node(Consumer)
...     consumer2 = f.create_node(Consumer)
...     f.connect_nodes(producer.outputs[0], consumer1.inputs[0])
...     f.connect_nodes(producer.outputs[0], consumer2.inputs[0])
...     producer.push_data(D)
>>>
>>> build_and_run(rc.Data([1, 2, 3]))
[1, 2, 3, 4]
[1, 2, 3, 4, 4]

This can be useful for optimization when sharing large data, but might not be what you want. To avoid this you might want to make sure to copy \(d\) when its payload is consumed:

>>> class MyListData(rc.Data):
...     @property
...     def payload(self):
...         return self._payload.copy()
>>>
>>> build_and_run(MyListData([1, 2, 3]))
[1, 2, 3, 4]
[1, 2, 3, 4]
identifier: str = 'Data'

unique Data identifier; you can set this manually in subclasses, if you don’t the class name will be used

legacy_identifiers: List[str] = []

a list of compatible identifiers in case you change the identifier

get_data()[source]

VIRTUAL

Transform the data object to a pickle serializable object. Do not use this function to access the payload, use payload instead.

set_data(data)[source]

VIRTUAL

Deserialize the data object from the dict created in get_data().

data() Dict[source]

Convert the object to a JSON compatible dict. Reserved field names are ‘GID’ and ‘version’.

load(data: Dict)[source]

Recreate the object state from the data dict returned by data().

Convention: don’t call this method in the constructor, invoke it manually from outside, if other components can depend on it (and be notified of its creation). Reason: If another component X depends on this one (and gets notified when this one is created), X should be notified before it gets notified of creation or loading of subcomponents created during this load. (E.g. add-ons need to know the flow before nodes are loaded.)

ryvencore.Flow module

This module defines the abstract flow, managing node, edges, etc. Flow execution is implemented by FlowExecutor class.

A flow is a directed, usually but not necessarily acyclic multi-graph of nodes and edges (connections between nodes). The nodes are the computational units and the edges define the flow of data between them. The fundamental operations to perform on a flow are:

  • adding a node

  • removing a node and incident edges

  • adding an edge between a node output and another node’s input

  • removing an edge

Flow Execution Modes

There are a couple of different modes / algorithms for executing a flow.

Data Flow

In the normal data flow mode, data is simply forward propagated on change. Specifically, this means the following:

A node output may have 0 or more outgoing connections/edges. When a node’s output value is updated, the new value is propagated to all connected nodes’ inputs. If there are multiple edges, the order of activation is undefined.

A node input may have 0 or 1 incoming connections/edges. When a node’s input receives new data, the node’s update event is invoked.

A flow execution is started once some node’s update event is invoked (either by direct invocation through node.update(), or by receiving input data), or some node’s output value is updated.

A node can consume inputs and update outputs at any time.

Assumptions:

  • no non-terminating feedback loops.

Data Flow with Optimization

Since the naive implementation of the above specification can be highly inefficient in some cases, a more advanced algorithm can be used. This algorithm ensures that, during a flow execution, each edge is updated at most once. It should implement the same semantics as the data flow algorithm, but with a slightly tightened assumption:

  • no feedback loops / cycles in the graph

  • nodes never modify their ports (inputs, outputs) during execution (update event)

The additional work required for this at the beginning of a flow execution is based on a DP algorithm running in \(\mathcal{O}(|V|+ |E|)\) time, where \(|V|\) is the number of nodes and \(|E|\) is the number of edges. However, when there are multiple consecutive executions without any subsequent changes to the graph, this work does not need to be repeated and execution is fast.

Execution Flow

The special exec mode uses an additional type of connection (edge): the execution connection. While pure data flows are the more common use case, some applications call for a slightly different paradigm. You can think of the exec mode as e.g. UnrealEngine’s blueprint system.

In exec mode, calling node.exec_output(index) has a similar effect as calling node.set_output_val(index, val) in data mode, but without any data being propagated, so it’s just a trigger signal. Pushing output data, however, does not cause updates in successor nodes.

When a node is updated (it received an update event through an exec connection), once it needs input data (it calls self.input(index)), if that input is connected to some predecessor node P, then P receives an update event with inp=-1, during which it should push the output data. Therefore, data is not forward propagated on change (node.set_output_val(index, value)), but generated on request (backwards, node.input() -> pred.update_event() -> pred.set_output_val() -> return).

The exec mode is still somewhat experimental, because the data mode is the far more common use case. It is not yet clear how to best implement the exec mode in a way that is both efficient and easy to use.

Assumptions:

  • no non-terminating feedback loops with exec connections

class Flow(session, title: str)[source]

Bases: Base

Manages all abstract flow components (nodes, edges, executors, etc.) and exposes methods for modification.

load(data: Dict)[source]

Loading a flow from data as previously returned by Flow.data().

load_components(nodes_data, conns_data, output_data)[source]

Loading nodes and their connections from data as previously returned by Flow.data(). This method will call Node.rebuilt() after connections are established on all nodes. Returns the new nodes and connections.

create_node(node_class: Type[Node], data=None)[source]

Creates, adds and returns a new node object

add_node(node: Node)[source]

Places the node object in the graph, Stores it, and causes the node’s Node.place_event() to be executed. Flow.create_node() automatically adds the node already, so no need to call this manually.

remove_node(node: Node)[source]

Removes a node from the flow without deleting it. Can be added again with Flow.add_node().

add_node_input(node: Node, inp: NodeInput, _call_flow_changed=True)[source]

updates internal data structures

add_node_output(node: Node, out: NodeOutput, _call_flow_changed=True)[source]

updates internal data structures.

remove_node_input(node: Node, inp: NodeInput, _call_flow_changed=True)[source]

updates internal data structures.

remove_node_output(node: Node, out: NodeOutput, _call_flow_changed=True)[source]

updates internal data structures.

check_connection_validity(c: Tuple[NodeOutput, NodeInput]) bool[source]

Checks whether a considered connect action is legal.

connect_nodes(out: NodeOutput, inp: NodeInput, silent=False) Tuple[NodeOutput, NodeInput] | None[source]

Connects two node ports. Returns the connection if successful, None otherwise.

disconnect_nodes(out: NodeOutput, inp: NodeInput, silent=False)[source]

Disconnects two node ports.

add_connection(c: Tuple[NodeOutput, NodeInput], silent=False)[source]

Adds an edge between two node ports.

remove_connection(c: Tuple[NodeOutput, NodeInput], silent=False)[source]

Removes an edge.

connected_inputs(out: NodeOutput) List[NodeInput][source]

Returns a list of all connected inputs to the given output port.

connected_output(inp: NodeInput) NodeOutput | None[source]

Returns the connected output port to the given input port, or None if it is not connected.

algorithm_mode() str[source]

Returns the current algorithm mode of the flow as string.

set_algorithm_mode(mode: str)[source]

Sets the algorithm mode of the flow from a string, possible values are ‘data’, ‘data opt’, and ‘exec’.

data() dict[source]

Serializes the flow: returns a JSON compatible dict containing all data of the flow.

ryvencore.Node module

class Node(params: Tuple[Flow, Session])[source]

Bases: Base

Base class for all node blueprints. Such a blueprint is made by subclassing this class and registering that subclass in the session. Actual node objects are instances of it. The node’s static properties are static attributes. Refer to python’s static class attributes behavior.

title = ''

the node’s title

tags: List[str] = []

a list of tag strings, often useful for searching etc.

version: str | None = None

version tag, use it!

init_inputs: List[NodeInputType] = []

list of node input types determining the initial inputs

init_outputs: List[NodeOutputType] = []

initial outputs list, see init_inputs

identifier: str

unique node identifier string. if not given it will set it to the class name when registering in the session

legacy_identifiers: List[str] = []

a list of compatible identifiers, useful when you change the class name (and hence the identifier) to provide backward compatibility to load old projects that rely on the old identifier

identifier_prefix: str | None = None

becomes part of the identifier if set; can be useful for grouping nodes

initialize()[source]

Sets up the node ports.

after_placement()[source]

Called from Flow when the nodes gets added.

prepare_removal()[source]

Called from Flow when the node gets removed.

update(inp=-1)[source]

Activates the node, causing an update_event() if block_updates is not set. For performance-, simplicity-, and maintainability-reasons activation is now fully handed over to the operating FlowExecutor, and not managed decentralized in Node, NodePort, and Connection anymore.

input(index: int) Data | None[source]

Returns the data residing at the data input of given index.

Do not call on exec inputs.

exec_output(index: int)[source]

Executes an exec output, causing activation of all connections.

Do not call on data outputs.

set_output_val(index: int, data: Data)[source]

Sets the value of a data output causing activation of all connections in data mode.

update_event(inp=-1)[source]

VIRTUAL

Gets called when an input received a signal or some node requested data of an output in exec mode. Implement this in your node class, this is the place where the main processing of your node should happen.

place_event()[source]

VIRTUAL

Called once the node object has been fully initialized and placed in the flow. When loading content, place_event() is executed before connections are built.

Notice that this method gets executed every time the node is added to the flow, which can happen more than once if the node was subsequently removed (e.g. due to undo/redo operations).

remove_event()[source]

VIRTUAL

Called when the node is removed from the flow; useful for stopping threads and timers etc.

additional_data() Dict[source]

VIRTUAL

additional_data()/load_additional_data() is almost equivalent to get_state()/set_state(), but it turned out to be useful for frontends to have their own dedicated version, so get_state()/set_state() stays clean for all specific node subclasses.

load_additional_data(data: Dict)[source]

VIRTUAL

For loading the data returned by additional_data().

get_state() Dict[source]

VIRTUAL

If your node is stateful, implement this method for serialization. It should return a JSON compatible dict that encodes your node’s state. The dict will be passed to set_state() when the node is loaded.

set_state(data: Dict, version)[source]

VIRTUAL

Opposite of get_state(), reconstruct any custom internal state here. Notice, that add-ons might not yet be fully available here, but in place_event() the should be.

rebuilt()[source]

VIRTUAL

If the node was created by loading components in the flow (see Flow.load_components()), this method will be called after the node has been added to the graph and incident connections are established.

create_input(label: str = '', type_: str = 'data', default: Data | None = None, load_from=None, insert: int | None = None)[source]

Creates and adds a new input at the end or index insert if specified.

delete_input(index: int)[source]

Disconnects and removes an input.

create_output(label: str = '', type_: str = 'data', load_from=None, insert: int | None = None)[source]

Creates and adds a new output at the end or index insert if specified.

delete_output(index: int)[source]

Disconnects and removes output.

get_addon(name: str)[source]

Returns an add-on registered in the session by name, or None if it wasn’t found.

load(data)[source]

Initializes the node from the data dict returned by Node.data(). Called by the flow, before the node is added to it. It does not crash on exception when loading user_data, as this is not uncommon when developing nodes.

data() Dict[source]

Serializes the node’s metadata, current configuration, and user state into a JSON-compatible dict, from which the node can be loaded later using Node.load().

ryvencore.Session module

class Session(gui: bool = False, load_addons: bool = False)[source]

Bases: Base

The Session is the top level interface to your project. It mainly manages flows, nodes, and add-ons and provides methods for serialization and deserialization of the project.

register_addons(location: str | None = None)[source]

Loads all addons from the given location, or from ryvencore’s addons directory if location is None. location can be an absolute path to any readable directory. New addons can be registered at any time. Addons cannot be de-registered. See ryvencore.AddOn.

register_node_types(node_types: List[Type[Node]])[source]

Registers a list of Nodes which then become available in the flows. Do not attempt to place nodes in flows that haven’t been registered in the session before.

register_node_type(node_class: Type[Node])[source]

Registers a single node.

unregister_node(node_class: Type[Node])[source]

Unregisters a node which will then be removed from the available list. Existing instances won’t be affected.

all_node_objects() List[Node][source]

Returns a list of all node objects instantiated in any flow.

register_data_type(data_type_class: Type[Data])[source]

Registers a new Data subclass which will then be available in the flows.

register_data_types(data_type_classes: List[Type[Data]])[source]

Registers a list of Data subclasses which will then be available in the flows.

create_flow(title: str, data: Dict | None = None) Flow[source]

Creates and returns a new flow. If data is provided the title parameter will be ignored.

rename_flow(flow: Flow, title: str) bool[source]

Renames an existing flow and returns success boolean.

flow_title_valid(title: str) bool[source]

Checks whether a considered title for a new flow is valid (unique) or not.

delete_flow(flow: Flow)[source]

Deletes an existing flow.

load(data: Dict) List[Flow][source]

Loads a project and raises an exception if required nodes are missing (not registered).

serialize() Dict[source]

Returns the project as JSON compatible dict to be saved and loaded again using load()

data() dict[source]

Serializes the project’s abstract state into a JSON compatible dict. Pass to load() in a new session to restore. Don’t use this function for saving, use serialize() in order to include the effects of Base.complete_data().

Module contents