1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
use crate::{
	callbacks::{EventCallbackInterface, PyEventCallbackInterface},
	comm::ChannelHandler,
	config::{Config, Neo4jQueryConfig},
	cross::{CLRepr, CLReprPython},
	querent::{py_runtime, PyRuntime, QuerentError},
	tokio_runtime,
};
use futures::TryFutureExt;
use log;
use pyo3::{prelude::*, types::PyFunction};
use std::{collections::HashMap, sync::Mutex};
use tokio::runtime::Runtime;

/// Represents a workflow.
#[derive(Debug, Clone)]
#[pyclass]
pub struct QueryEngine {
	pub name: String,
	pub id: String,
	pub import: String,
	pub attr: String,
	pub code: Option<String>,
	pub arguments: Vec<CLRepr>,
	pub config: Option<Neo4jQueryConfig>,
}

pub struct QueryEngineManager {
	/// Mutex-protected map of engines for query, keyed by their unique identifier.
	pub workflows: Mutex<HashMap<String, QueryEngine>>,
	/// Reference to the Python runtime.
	pub runtime: &'static PyRuntime,
}

impl QueryEngineManager {
	/// Creates a new `WorkflowManager` instance.
	pub fn new() -> Result<Self, String> {
		let runtime = py_runtime().map_err(|e| e.to_string())?;
		Ok(Self { workflows: Mutex::new(HashMap::new()), runtime })
	}
	/// Adds a workflow to the manager.
	pub fn add_workflow(&self, workflow: QueryEngine) -> Result<(), String> {
		let mut workflows =
			self.workflows.lock().map_err(|e| format!("Mutex lock failed: {}", e))?;
		if workflows.contains_key(&workflow.id) {
			return Err("Workflow with the same ID already exists.".to_string());
		} else {
			workflows.insert(workflow.id.clone(), workflow.clone());
		}
		Ok(())
	}

	/// Retrieves a list of all workflows managed by this manager.
	pub fn get_workflows(&self) -> Vec<QueryEngine> {
		let workflows = self.workflows.lock().unwrap();
		workflows.values().cloned().collect()
	}

	/// Starts workflows by executing their Python code asynchronously.
	pub async fn start_workflows(&self) -> Result<(), QuerentError> {
		let workflows = self.get_workflows();
		let handles: Vec<_> = workflows
			.iter()
			.map(|_workflow| {
				let args = _workflow.arguments.clone();
				let res = match &_workflow.code {
					None => Python::with_gil(|py| {
						let async_mod = py.import(_workflow.import.as_str()).map_err(|e| {
							log::error!("Failed to import module {}: {}", _workflow.import, e);
							QuerentError::internal(e.to_string())
						})?;

						let coroutine =
							async_mod.getattr(_workflow.attr.as_str()).map_err(|_| {
								log::error!("Failed to find start function.");
								QuerentError::internal("Failed to find start function.".to_string())
							})?;

						let querent_py_fun: Py<PyFunction> = coroutine.extract().map_err(|e| {
							log::error!("Failed to extract function: {}", e);
							QuerentError::internal(e.to_string())
						})?;

						let call_future = self.runtime.call_async(
							querent_py_fun,
							args,
							None,
							_workflow.config.clone(),
						);
						Ok(call_future)
					}),
					Some(code) => {
						let module_file: String = _workflow.id.clone() + ".py";
						Python::with_gil(|py| {
							let dynamic_module = PyModule::from_code(
								py,
								code.as_str(),
								module_file.as_str(),
								_workflow.name.as_str(),
							)
							.map_err(|e| {
								log::error!("Failed to import module {}: {}", _workflow.import, e);
								QuerentError::internal(e.to_string())
							})?;

							let attr_fun =
								dynamic_module.getattr(_workflow.attr.as_str()).map_err(|_| {
									log::error!("Failed to find start function.");
									QuerentError::internal(
										"Failed to find start function.".to_string(),
									)
								})?;

							let querent_py_fun: Py<PyFunction> =
								attr_fun.extract().map_err(|e| {
									log::error!("Failed to extract function: {}", e);
									QuerentError::internal(e.to_string())
								})?;

							let call_future = self.runtime.call_async(
								querent_py_fun,
								args,
								None,
								_workflow.config.clone(),
							);
							Ok(call_future)
						})
					},
				};
				res
			})
			.collect();
		for handle in handles {
			match handle {
				Ok(future) => match future.await {
					Ok(_) => log::info!("Workflow started."),
					Err(e) => {
						log::error!("Failed to start workflow: {}", e);
						return Err(QuerentError::internal(e.to_string()));
					},
				},
				Err(e) => {
					log::error!("Failed to start workflow: {}", e);
					return Err(e);
				},
			}
		}
		Ok(())
	}
}

impl Drop for QueryEngineManager {
	/// Drops the `WorkflowManager` instance, cleaning up resources.
	fn drop(&mut self) {
		log::info!("Dropping WorkflowManager");
		let _ = self.runtime;
	}
}