Skip to content
Snippets Groups Projects
Commit fcf44186 authored by DeathwingTheBoss's avatar DeathwingTheBoss
Browse files

- Handle batch calls

- Allow API calls that aren't necessarily sent with right header

Co-authored-by: default avatarBora <kulkalkul@users.noreply.github.com>
parent 8209172a
No related branches found
No related tags found
No related merge requests found
...@@ -559,7 +559,7 @@ checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" ...@@ -559,7 +559,7 @@ checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257"
[[package]] [[package]]
name = "drone" name = "drone"
version = "0.1.3" version = "0.1.4"
dependencies = [ dependencies = [
"actix-web", "actix-web",
"config", "config",
......
[package] [package]
name = "drone" name = "drone"
version = "0.1.3" version = "0.1.4"
edition = "2021" edition = "2021"
authors = ["Deathwing <hi@deathwing.me>"] authors = ["Deathwing <hi@deathwing.me>"]
description = "A caching reverse-proxy application for the Hive blockchain." description = "A caching reverse-proxy application for the Hive blockchain."
......
use std::{time::Duration, sync::Mutex}; use std::{time::Duration, sync::Mutex};
use actix_web::{web::{self, JsonConfig}, App, HttpResponse, HttpServer, Responder, HttpRequest}; use actix_web::{web, App, HttpResponse, HttpServer, Responder, HttpRequest};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use reqwest::{Client, ClientBuilder}; use reqwest::{Client, ClientBuilder};
use lru_time_cache::LruCache; use lru_time_cache::LruCache;
...@@ -38,9 +38,17 @@ async fn index(appdata: web::Data<AppData>) -> impl Responder { ...@@ -38,9 +38,17 @@ async fn index(appdata: web::Data<AppData>) -> impl Responder {
}) })
} }
// Enum for API Requests, either single or batch.
#[derive(Deserialize, Debug)]
#[serde(untagged)]
enum APICall {
Single(APIRequest),
Batch(Vec<APIRequest>),
}
// Structure for API calls. // Structure for API calls.
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
struct APICall { struct APIRequest {
jsonrpc: String, jsonrpc: String,
id: u32, id: u32,
method: String, method: String,
...@@ -52,7 +60,7 @@ struct APICall { ...@@ -52,7 +60,7 @@ struct APICall {
struct ErrorStructure { struct ErrorStructure {
code: i32, code: i32,
message: String, message: String,
error_data: String, error: String,
} }
// Enum for the endpoints. // Enum for the endpoints.
...@@ -62,6 +70,11 @@ enum Endpoints { ...@@ -62,6 +70,11 @@ enum Endpoints {
HIVEMIND, HIVEMIND,
} }
struct APICallResponse {
result: Value,
cached: bool,
}
// Choose the endpoint depending on the method. // Choose the endpoint depending on the method.
impl Endpoints { impl Endpoints {
fn choose_endpoint<'a>(&self, appdata: &'a web::Data<AppData>) -> &'a str { fn choose_endpoint<'a>(&self, appdata: &'a web::Data<AppData>) -> &'a str {
...@@ -73,48 +86,24 @@ impl Endpoints { ...@@ -73,48 +86,24 @@ impl Endpoints {
} }
} }
async fn api_call(req: HttpRequest, call: web::Json<APICall>, data: web::Data<AppData>) -> impl Responder { async fn handle_request(request: &APIRequest, data: &web::Data<AppData>, client_ip: &String ) -> Result<APICallResponse, ErrorStructure> {
// Convert the call to a struct. // Convert the call to a struct.
let call = call.into_inner();
let client = data.webclient.clone(); let client = data.webclient.clone();
let json_rpc_call = APICall { // If there's a single call, just forward it.
jsonrpc: "2.0".to_string(),
id: call.id,
method: call.method,
params: call.params,
};
// Get humantime for logging. // Get humantime for logging.
let human_timestamp = humantime::format_rfc3339_seconds(std::time::SystemTime::now()); let human_timestamp = humantime::format_rfc3339_seconds(std::time::SystemTime::now());
// Log the request, if there's Cloudflare header (CF-Connecting-IP) use that instead of peer_addr.
let get_cloudflare_ip = req.headers().get("CF-Connecting-IP");
let client_ip = match get_cloudflare_ip {
Some(ip) => ip.to_str().map(|ip| ip.to_string()),
None => Ok(req.peer_addr().unwrap().ip().to_string()),
};
let client_ip = match client_ip {
Ok(ip) => ip,
Err(_) => return HttpResponse::InternalServerError().json(ErrorStructure {
code: 9999,
message: "Internal Server Error".to_string(),
error_data: "Invalid Cloudflare Proxy Header.".to_string(),
}),
};
let formatted_log = let formatted_log =
format!( format!(
"Timestamp: {} || IP: {} || Request Method: {} || Request Params: {}", "Timestamp: {} || IP: {} || Request Method: {} || Request Params: {}",
human_timestamp, human_timestamp,
client_ip, client_ip,
json_rpc_call.method, request.method,
json_rpc_call.params, request.params,
); );
println!("{}", formatted_log); println!("{}", formatted_log);
// Pick the endpoints depending on the method. // Pick the endpoints depending on the method.
let endpoints = match json_rpc_call.method.as_str() { let endpoints = match request.method.as_str() {
// HAF // HAF
"condenser_api.get_block" => Endpoints::HAF, "condenser_api.get_block" => Endpoints::HAF,
"block_api.get_block_range" => Endpoints::HAF, "block_api.get_block_range" => Endpoints::HAF,
...@@ -158,63 +147,117 @@ async fn api_call(req: HttpRequest, call: web::Json<APICall>, data: web::Data<Ap ...@@ -158,63 +147,117 @@ async fn api_call(req: HttpRequest, call: web::Json<APICall>, data: web::Data<Ap
_anything_else => Endpoints::HAF, _anything_else => Endpoints::HAF,
}; };
// Cache Status
let mut cache_status = "MISS";
// Check if the call is in the cache. // Check if the call is in the cache.
if let Some(cached_call) = if let Some(cached_call) =
data.cache.lock().unwrap().get(&json_rpc_call.params.to_string()) { data.cache.lock().unwrap().get(&request.params.to_string()) {
cache_status = "HIT"; return Ok(APICallResponse
return HttpResponse::Ok() {
.insert_header(("Drone-Version", DRONE_VERSION)) result: cached_call.clone(),
.insert_header(("Cache-Status", cache_status)) cached: true,
.json(cached_call); });
} }
// Send the request to the endpoints. // Send the request to the endpoints.
let res = match client.post(endpoints.choose_endpoint(&data)) let res = match client.post(endpoints.choose_endpoint(&data))
.json(&json_rpc_call) .json(&request)
.send() .send()
.await { .await {
Ok(response) => response, Ok(response) => response,
Err(err) => return HttpResponse::InternalServerError().json(ErrorStructure { Err(err) => return Err(ErrorStructure {
code: 1000, code: 1000,
message: format!("Unable to send the request to the endpoints."), message: format!("Unable to send request to endpoint."),
error_data: err.to_string(), error: err.to_string(),
}), }),
}; };
let body = match res.text().await { let body = match res.text().await {
Ok(text) => text, Ok(text) => text,
Err(err) => return HttpResponse::InternalServerError().json(ErrorStructure { Err(err) => return Err(ErrorStructure {
code: 2000, code: 2000,
message: format!("Received an invalid response from the endpoints."), message: format!("Received an invalid response from the endpoint."),
error_data: err.to_string(), error: err.to_string(),
}), }),
}; };
let json_body: serde_json::Value = match serde_json::from_str(&body) { let json_body: serde_json::Value = match serde_json::from_str(&body) {
Ok(parsed) => parsed, Ok(parsed) => parsed,
Err(err) => return HttpResponse::InternalServerError().json(ErrorStructure { Err(err) => return Err(ErrorStructure {
code: 3000, code: 3000,
message: format!("Unable to parse endpoints data."), message: format!("Unable to parse endpoint data."),
error_data: err.to_string(), error: err.to_string(),
}), }),
}; };
if json_body["result"].is_array() && if json_body["result"].is_array() &&
json_body["result"].as_array().unwrap().is_empty() || json_body["result"].is_null() { json_body["result"].as_array().unwrap().is_empty() || json_body["result"].is_null() {
return HttpResponse::InternalServerError().json(ErrorStructure { return Err(ErrorStructure {
code: 4000, code: 4000,
message: format!("Unable to parse endpoints data."), message: format!("Unable to parse endpoint data."),
error_data: "The endpoint returned an empty result.".to_string(), error: "The endpoint returned an empty result.".to_string(),
}); });
} }
if DRONE_CACHEABLE_METHODS.contains(&json_rpc_call.method.as_str()) { if DRONE_CACHEABLE_METHODS.contains(&request.method.as_str()) {
// Insert the call in the cache. data.cache.lock().unwrap().insert(request.params.to_string(), json_body.clone());
data.cache.lock().unwrap().insert(json_rpc_call.params.to_string(), json_body);
} }
Ok(APICallResponse
{
result: json_body,
cached: false,
})
}
async fn api_call(req: HttpRequest, call: web::Json<APICall>, data: web::Data<AppData>) -> impl Responder {
// Log the request, if there's Cloudflare header (CF-Connecting-IP) use that instead of peer_addr.
let get_cloudflare_ip = req.headers().get("CF-Connecting-IP");
let client_ip = match get_cloudflare_ip {
Some(ip) => ip.to_str().map(|ip| ip.to_string()),
None => Ok(req.peer_addr().unwrap().ip().to_string()),
};
let user_ip = match client_ip {
Ok(ip) => ip,
Err(_) => return HttpResponse::InternalServerError().json(ErrorStructure {
code: 9999,
message: "Internal Server Error".to_string(),
error: "Invalid Cloudflare Proxy Header.".to_string(),
}),
};
let response = match call.0 {
APICall::Single(request) => {
let result = handle_request(&request, &data, &user_ip).await;
match result {
Ok(response) => response,
Err(err) => return HttpResponse::InternalServerError().json(err),
}
},
APICall::Batch(requests) => {
let mut responses = Vec::new();
for request in requests {
let result = handle_request(&request, &data, &user_ip).await;
match result {
Ok(response) => responses.push(response),
Err(err) => return HttpResponse::InternalServerError().json(err),
}
}
let mut cached = true;
let mut result = Vec::new();
for response in responses {
if response.cached == false {
cached = false;
}
result.push(response.result);
}
APICallResponse
{
result: serde_json::Value::Array(result),
cached: cached,
}
},
};
HttpResponse::Ok() HttpResponse::Ok()
.insert_header(("Drone-Version", DRONE_VERSION)) .insert_header(("Drone-Version", DRONE_VERSION))
.insert_header(("Cache-Status", cache_status)) .insert_header(("Cache-Status", response.cached.to_string()))
.body(body) .json(&response.result)
} }
...@@ -241,7 +284,6 @@ struct DroneConfig { ...@@ -241,7 +284,6 @@ struct DroneConfig {
actix_connection_threads: usize, actix_connection_threads: usize,
} }
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
// Load config. // Load config.
...@@ -266,8 +308,8 @@ async fn main() -> std::io::Result<()> { ...@@ -266,8 +308,8 @@ async fn main() -> std::io::Result<()> {
println!("Drone is running on port {}.", config.port); println!("Drone is running on port {}.", config.port);
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.app_data(web::JsonConfig::default().content_type(|_| true))
.app_data(_cache.clone()) .app_data(_cache.clone())
.app_data(JsonConfig::default().content_type_required(false))
.route("/", web::get().to(index)) .route("/", web::get().to(index))
.route("/", web::post().to(api_call)) .route("/", web::post().to(api_call))
.route("/health", web::get().to(index)) .route("/health", web::get().to(index))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment