|
| 1 | +use candid::{CandidType, Nat, Principal}; |
| 2 | +use ciborium::into_writer; |
| 3 | +use futures::FutureExt; |
| 4 | +use ic_cdk::api::management_canister::http_request::{CanisterHttpRequestArgument, HttpResponse}; |
| 5 | +use serde::{Deserialize, Serialize}; |
| 6 | +use std::collections::BTreeSet; |
| 7 | + |
| 8 | +use crate::{agent::Agent, cose::CoseClient, store}; |
| 9 | + |
| 10 | +#[derive(CandidType, Deserialize, Serialize)] |
| 11 | +pub struct StateInfo { |
| 12 | + pub ecdsa_key_name: String, |
| 13 | + pub proxy_token_public_key: String, |
| 14 | + pub proxy_token_refresh_interval: u64, // seconds |
| 15 | + pub agents: Vec<Agent>, |
| 16 | + pub managers: BTreeSet<Principal>, |
| 17 | + pub subnet_size: u64, |
| 18 | + pub service_fee: u64, // in cycles |
| 19 | + pub incoming_cycles: u128, |
| 20 | + pub uncollectible_cycles: u128, |
| 21 | + pub cose: Option<CoseClient>, |
| 22 | +} |
| 23 | + |
| 24 | +#[ic_cdk::query] |
| 25 | +fn get_state() -> Result<StateInfo, ()> { |
| 26 | + let s = store::state::with(|s| StateInfo { |
| 27 | + ecdsa_key_name: s.ecdsa_key_name.clone(), |
| 28 | + proxy_token_public_key: s.proxy_token_public_key.clone(), |
| 29 | + proxy_token_refresh_interval: s.proxy_token_refresh_interval, |
| 30 | + agents: s |
| 31 | + .agents |
| 32 | + .iter() |
| 33 | + .map(|a| Agent { |
| 34 | + name: a.name.clone(), |
| 35 | + endpoint: a.endpoint.clone(), |
| 36 | + max_cycles: a.max_cycles, |
| 37 | + proxy_token: None, |
| 38 | + }) |
| 39 | + .collect(), |
| 40 | + managers: s.managers.clone(), |
| 41 | + subnet_size: s.subnet_size, |
| 42 | + service_fee: s.service_fee, |
| 43 | + incoming_cycles: s.incoming_cycles, |
| 44 | + uncollectible_cycles: s.uncollectible_cycles, |
| 45 | + cose: s.cose.clone(), |
| 46 | + }); |
| 47 | + Ok(s) |
| 48 | +} |
| 49 | + |
| 50 | +#[ic_cdk::query] |
| 51 | +async fn proxy_http_request_cost(req: CanisterHttpRequestArgument) -> u128 { |
| 52 | + let calc = store::state::cycles_calculator(); |
| 53 | + calc.ingress_cost(ic_cdk::api::call::arg_data_raw_size()) |
| 54 | + + calc.http_outcall_request_cost(calc.count_request_bytes(&req), 1) |
| 55 | + + calc.http_outcall_response_cost(req.max_response_bytes.unwrap_or(10240) as usize, 1) |
| 56 | +} |
| 57 | + |
| 58 | +#[ic_cdk::query] |
| 59 | +async fn parallel_call_cost(req: CanisterHttpRequestArgument) -> u128 { |
| 60 | + let agents = store::state::get_agents(); |
| 61 | + let calc = store::state::cycles_calculator(); |
| 62 | + calc.ingress_cost(ic_cdk::api::call::arg_data_raw_size()) |
| 63 | + + calc.http_outcall_request_cost(calc.count_request_bytes(&req), agents.len()) |
| 64 | + + calc.http_outcall_response_cost( |
| 65 | + req.max_response_bytes.unwrap_or(10240) as usize, |
| 66 | + agents.len(), |
| 67 | + ) |
| 68 | +} |
| 69 | + |
| 70 | +/// Proxy HTTP request by all agents in sequence until one returns an status <= 500 result. |
| 71 | +#[ic_cdk::update] |
| 72 | +async fn proxy_http_request(req: CanisterHttpRequestArgument) -> HttpResponse { |
| 73 | + if !store::state::is_allowed(&ic_cdk::caller()) { |
| 74 | + return HttpResponse { |
| 75 | + status: Nat::from(403u64), |
| 76 | + body: "caller is not allowed".as_bytes().to_vec(), |
| 77 | + headers: vec![], |
| 78 | + }; |
| 79 | + } |
| 80 | + |
| 81 | + let agents = store::state::get_agents(); |
| 82 | + if agents.is_empty() { |
| 83 | + return HttpResponse { |
| 84 | + status: Nat::from(503u64), |
| 85 | + body: "no agents available".as_bytes().to_vec(), |
| 86 | + headers: vec![], |
| 87 | + }; |
| 88 | + } |
| 89 | + |
| 90 | + let calc = store::state::cycles_calculator(); |
| 91 | + store::state::receive_cycles( |
| 92 | + calc.ingress_cost(ic_cdk::api::call::arg_data_raw_size()), |
| 93 | + false, |
| 94 | + ); |
| 95 | + |
| 96 | + let req_size = calc.count_request_bytes(&req); |
| 97 | + let mut last_err: Option<HttpResponse> = None; |
| 98 | + for agent in agents { |
| 99 | + store::state::receive_cycles(calc.http_outcall_request_cost(req_size, 1), false); |
| 100 | + match agent.call(req.clone()).await { |
| 101 | + Ok(res) => { |
| 102 | + let cycles = calc.http_outcall_response_cost(calc.count_response_bytes(&res), 1); |
| 103 | + store::state::receive_cycles(cycles, true); |
| 104 | + return res; |
| 105 | + } |
| 106 | + Err(res) => last_err = Some(res), |
| 107 | + } |
| 108 | + } |
| 109 | + |
| 110 | + last_err.unwrap() |
| 111 | +} |
| 112 | + |
| 113 | +/// Proxy HTTP request by all agents in parallel and return the result if all are the same, |
| 114 | +/// or a 500 HttpResponse with all result. |
| 115 | +#[ic_cdk::update] |
| 116 | +async fn parallel_call_all_ok(req: CanisterHttpRequestArgument) -> HttpResponse { |
| 117 | + if !store::state::is_allowed(&ic_cdk::caller()) { |
| 118 | + return HttpResponse { |
| 119 | + status: Nat::from(403u64), |
| 120 | + body: "caller is not allowed".as_bytes().to_vec(), |
| 121 | + headers: vec![], |
| 122 | + }; |
| 123 | + } |
| 124 | + |
| 125 | + let agents = store::state::get_agents(); |
| 126 | + if agents.is_empty() { |
| 127 | + return HttpResponse { |
| 128 | + status: Nat::from(503u64), |
| 129 | + body: "no agents available".as_bytes().to_vec(), |
| 130 | + headers: vec![], |
| 131 | + }; |
| 132 | + } |
| 133 | + |
| 134 | + let calc = store::state::cycles_calculator(); |
| 135 | + let cycles = calc.ingress_cost(ic_cdk::api::call::arg_data_raw_size()) |
| 136 | + + calc.http_outcall_request_cost(calc.count_request_bytes(&req), agents.len()); |
| 137 | + store::state::receive_cycles(cycles, false); |
| 138 | + |
| 139 | + let results = |
| 140 | + futures::future::try_join_all(agents.iter().map(|agent| agent.call(req.clone()))).await; |
| 141 | + match results { |
| 142 | + Err(res) => res, |
| 143 | + Ok(res) => { |
| 144 | + let mut results = res.into_iter(); |
| 145 | + let base_result = results.next().unwrap_or_else(|| HttpResponse { |
| 146 | + status: Nat::from(503u64), |
| 147 | + body: "no agents available".as_bytes().to_vec(), |
| 148 | + headers: vec![], |
| 149 | + }); |
| 150 | + |
| 151 | + let cycles = calc |
| 152 | + .http_outcall_response_cost(calc.count_response_bytes(&base_result), agents.len()); |
| 153 | + store::state::receive_cycles(cycles, true); |
| 154 | + |
| 155 | + let mut inconsistent_results: Vec<_> = |
| 156 | + results.filter(|result| result != &base_result).collect(); |
| 157 | + if !inconsistent_results.is_empty() { |
| 158 | + inconsistent_results.push(base_result); |
| 159 | + let mut buf = vec![]; |
| 160 | + into_writer(&inconsistent_results, &mut buf) |
| 161 | + .expect("failed to encode inconsistent results"); |
| 162 | + return HttpResponse { |
| 163 | + status: Nat::from(500u64), |
| 164 | + body: buf, |
| 165 | + headers: vec![], |
| 166 | + }; |
| 167 | + } |
| 168 | + |
| 169 | + base_result |
| 170 | + } |
| 171 | + } |
| 172 | +} |
| 173 | + |
| 174 | +/// Proxy HTTP request by all agents in parallel and return the first (status <= 500) result. |
| 175 | +#[ic_cdk::update] |
| 176 | +async fn parallel_call_any_ok(req: CanisterHttpRequestArgument) -> HttpResponse { |
| 177 | + if !store::state::is_allowed(&ic_cdk::caller()) { |
| 178 | + return HttpResponse { |
| 179 | + status: Nat::from(403u64), |
| 180 | + body: "caller is not allowed".as_bytes().to_vec(), |
| 181 | + headers: vec![], |
| 182 | + }; |
| 183 | + } |
| 184 | + |
| 185 | + let agents = store::state::get_agents(); |
| 186 | + if agents.is_empty() { |
| 187 | + return HttpResponse { |
| 188 | + status: Nat::from(503u64), |
| 189 | + body: "no agents available".as_bytes().to_vec(), |
| 190 | + headers: vec![], |
| 191 | + }; |
| 192 | + } |
| 193 | + |
| 194 | + let calc = store::state::cycles_calculator(); |
| 195 | + let cycles = calc.ingress_cost(ic_cdk::api::call::arg_data_raw_size()) |
| 196 | + + calc.http_outcall_request_cost(calc.count_request_bytes(&req), agents.len()); |
| 197 | + store::state::receive_cycles(cycles, false); |
| 198 | + |
| 199 | + let result = |
| 200 | + futures::future::select_ok(agents.iter().map(|agent| agent.call(req.clone()).boxed())) |
| 201 | + .await; |
| 202 | + match result { |
| 203 | + Ok((res, _)) => { |
| 204 | + let cycles = |
| 205 | + calc.http_outcall_response_cost(calc.count_response_bytes(&res), agents.len()); |
| 206 | + store::state::receive_cycles(cycles, true); |
| 207 | + res |
| 208 | + } |
| 209 | + Err(res) => res, |
| 210 | + } |
| 211 | +} |
0 commit comments