Improve tracing
This commit is contained in:
parent
3d0a05f3a9
commit
34e665154d
5 changed files with 84 additions and 56 deletions
|
@ -13,7 +13,7 @@ use hyper::Method;
|
|||
use puppy::Context;
|
||||
use serde_json::{json, Value};
|
||||
use tokio::net::TcpListener;
|
||||
use tracing::{info, info_span, trace_span, Instrument as _};
|
||||
use tracing::{error, info, info_span, trace_span, Instrument as _};
|
||||
|
||||
use crate::sig::{Signer, Verdict, Verifier, VERIFIER_MOUNT};
|
||||
|
||||
|
@ -92,11 +92,19 @@ pub async fn start(context: Context) -> Result<(), Box<dyn std::error::Error + S
|
|||
|
||||
tokio::spawn(async move {
|
||||
let service = service_fn(|req| {
|
||||
let span = info_span!("request");
|
||||
let user_agent = req
|
||||
.headers()
|
||||
.get("user-agent")
|
||||
.and_then(|h| h.to_str().ok());
|
||||
let span = info_span!(
|
||||
"request",
|
||||
target = format!("{} {}", req.method().as_str(), req.uri().to_string()),
|
||||
"user-agent" = user_agent,
|
||||
);
|
||||
handle(req, &verifier, cx.clone()).instrument(span)
|
||||
});
|
||||
if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
|
||||
eprintln!("Error serving connection: {:?}", err);
|
||||
error!("Error serving connection: {:?}", err);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -160,22 +168,14 @@ async fn handle(req: Request, verifier: &Verifier, cx: Context) -> Result<Respon
|
|||
let request = {
|
||||
let (req, body) = req.into_parts();
|
||||
let Ok(body) = body.collect().await.map(|b| b.to_bytes()) else {
|
||||
todo!();
|
||||
panic!("could not collect body!");
|
||||
};
|
||||
http::Request::from_parts(req, body)
|
||||
};
|
||||
|
||||
// Simplified representation of a request, so we can pattern match on it more easily in the dispatchers.
|
||||
let req = Req::simplify(&request);
|
||||
|
||||
let user_agent = request
|
||||
.headers()
|
||||
.get("user-agent")
|
||||
.and_then(|h| h.to_str().ok());
|
||||
info! {
|
||||
target = format!("{} {}", request.method(), request.uri()),
|
||||
user_agent,
|
||||
"incoming"
|
||||
};
|
||||
// We'll use the path to pick where specifically to send the request.
|
||||
// Check request signature at the door. Even if it isn't needed for a particular endpoint, failing fast
|
||||
// with a clear error message will save anyone trying to get *their* signatures implementation a major
|
||||
|
@ -195,10 +195,14 @@ async fn handle(req: Request, verifier: &Verifier, cx: Context) -> Result<Respon
|
|||
})),
|
||||
}),
|
||||
};
|
||||
|
||||
// If one of the endpoints gave us an error message, we convert that into a response and then
|
||||
// serve it to the client. In either case, we just serve a response.
|
||||
let response = res.unwrap_or_else(|msg| req.error(msg));
|
||||
info!(status = response.status().as_str(), "finished");
|
||||
let response = res.unwrap_or_else(|msg| {
|
||||
info!("{}: {msg}", msg.status);
|
||||
req.error(msg)
|
||||
});
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
|
@ -210,7 +214,7 @@ const GET: &Method = &Method::GET;
|
|||
/// This function is where all requests to a protected endpoint have to go through. If the request
|
||||
/// was signed but does not target a protected endpoint, this function will fall back to the
|
||||
/// [`dispatch_public`] handler.
|
||||
#[tracing::instrument(skip_all)]
|
||||
#[tracing::instrument(level = "DEBUG", target = "router", skip_all)]
|
||||
async fn dispatch_signed(
|
||||
cx: Context,
|
||||
verifier: &Verifier,
|
||||
|
@ -232,7 +236,7 @@ async fn dispatch_signed(
|
|||
/// Dispatch `req` to an unprotected endpoint. If the requested path does not exist, the
|
||||
/// function will return a 404 response. If the path *does* exist, but the signature is not
|
||||
/// valid, they will also get a 404.
|
||||
#[tracing::instrument(skip_all)]
|
||||
#[tracing::instrument(level = "DEBUG", target = "router", skip_all)]
|
||||
async fn dispatch_public(
|
||||
cx: Context,
|
||||
verifier: &Verifier,
|
||||
|
|
|
@ -8,6 +8,7 @@ use puppy::{
|
|||
get_local_ap_object, Context, Error, Key,
|
||||
};
|
||||
use serde_json::Value;
|
||||
use tracing::{info, instrument};
|
||||
|
||||
use crate::sig::{Signer, Verifier};
|
||||
use super::{error::Message, Response};
|
||||
|
@ -63,6 +64,7 @@ pub async fn outbox(cx: &Context, params: &[(&str, &str)]) -> Result<Response, M
|
|||
}
|
||||
|
||||
/// Handle POSTs to actor inboxes. Requires request signature.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn inbox(
|
||||
cx: &Context,
|
||||
actor_id: &str,
|
||||
|
@ -70,7 +72,15 @@ pub async fn inbox(
|
|||
body: &[u8],
|
||||
) -> Result<Response, Message> {
|
||||
let receiver = actor_id.parse::<Key>().unwrap();
|
||||
let json = serde_json::from_slice(body).unwrap();
|
||||
let json: Value = serde_json::from_slice(body).unwrap();
|
||||
let id = json["id"].to_string();
|
||||
|
||||
info! {
|
||||
inbox = receiver.to_string(),
|
||||
signature = sig.ap_id,
|
||||
"processing object '{id}'",
|
||||
};
|
||||
|
||||
match Activity::from_json(json) {
|
||||
Ok(activity) => {
|
||||
puppy::ingest(&cx, receiver, &activity).await.unwrap();
|
||||
|
|
|
@ -1,14 +1,13 @@
|
|||
//! Verification of HTTP signatures.
|
||||
|
||||
use http::Request;
|
||||
use puppy::config::Config;
|
||||
use puppy::fetch::{
|
||||
signatures::{Private, Public, Signature, SigningKey, VerificationKey, Key},
|
||||
FetchError,
|
||||
};
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use puppy::config::Config;
|
||||
use tracing::{debug, error, info, trace};
|
||||
use tracing::{debug, info, trace};
|
||||
|
||||
/// Checks request signatures.
|
||||
#[derive(Clone)]
|
||||
|
@ -82,7 +81,7 @@ impl Verifier {
|
|||
}
|
||||
/// Does the HTTP signature verification process, and returns a "proof" of the signature in the form
|
||||
/// of the [`Signer`], which contains information about who signed a particular request.
|
||||
#[tracing::instrument(skip_all)]
|
||||
#[tracing::instrument(level = "DEBUG", skip_all)]
|
||||
pub async fn verify<B>(&self, req: &Request<B>) -> Verdict {
|
||||
// TODO: implement the whole verification thing as a middleware so we can intercept requests
|
||||
// like these, instead of coupling this tightly with the router.
|
||||
|
@ -93,7 +92,7 @@ impl Verifier {
|
|||
}
|
||||
|
||||
let Some(header) = req.headers().get("signature") else {
|
||||
info!("request not signed");
|
||||
debug!("request not signed");
|
||||
return Verdict::Unsigned;
|
||||
};
|
||||
|
||||
|
@ -102,14 +101,10 @@ impl Verifier {
|
|||
.expect("signature header value should be valid ascii")
|
||||
.to_string();
|
||||
|
||||
let sig = match Signature::derive(&req) {
|
||||
Err(error) => {
|
||||
info! {
|
||||
reason = error.to_string(),
|
||||
signature = signature_str,
|
||||
"invalid signature",
|
||||
};
|
||||
return Verdict::Rejected { signature_str, reason: error };
|
||||
let sig = match Signature::derive(&req).map_err(|e| e.to_string()) {
|
||||
Err(reason) => {
|
||||
info!(reason, signature_str, "invalid signature");
|
||||
return Verdict::Rejected { signature_str, reason };
|
||||
}
|
||||
Ok(signature) => {
|
||||
trace!("signature parsed");
|
||||
|
@ -122,10 +117,7 @@ impl Verifier {
|
|||
let public_key = match fetch_result {
|
||||
Ok(public_key) => public_key,
|
||||
Err(err) => {
|
||||
info! {
|
||||
reason = err.to_string(),
|
||||
"failed to fetch pubkey",
|
||||
};
|
||||
info!(reason = err.to_string(), "failed to fetch pubkey");
|
||||
return Verdict::Rejected {
|
||||
reason: format!("could not fetch public key: {err}"),
|
||||
signature_str,
|
||||
|
@ -135,16 +127,16 @@ impl Verifier {
|
|||
|
||||
// TODO: verify digest also
|
||||
if let Err(error) = public_key.verify(&sig) {
|
||||
info!(reason = error, "rejected signature");
|
||||
info!(reason = error, "rejected");
|
||||
Verdict::Rejected { signature_str, reason: error }
|
||||
} else {
|
||||
info!(key_owner = public_key.owner, "signature OK");
|
||||
debug!(key_owner = public_key.owner, "accepted");
|
||||
Verdict::Verified(Signer { ap_id: public_key.owner })
|
||||
}
|
||||
}
|
||||
/// Send a request to get the public key from an ID. This request will be signed with the
|
||||
/// verifier actor's public key.
|
||||
#[tracing::instrument(skip_all)]
|
||||
#[tracing::instrument(level = "TRACE", skip_all)]
|
||||
async fn fetch_public_key(&self, uri: &str) -> Result<VerificationKey, FetchError> {
|
||||
let json = puppy::fetch::resolve(&self.signing_key(), uri).await?;
|
||||
let Some(key) = Key::from_json(json) else {
|
||||
|
|
|
@ -2,9 +2,9 @@ use chrono::Utc;
|
|||
use http::Method;
|
||||
use http_body_util::BodyExt as _;
|
||||
use reqwest::Body;
|
||||
use serde_json::Value;
|
||||
use serde_json::{error, Value};
|
||||
use derive_more::Display;
|
||||
use tracing::{debug, info, instrument};
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
|
||||
use crate::{
|
||||
object::Activity,
|
||||
|
@ -38,7 +38,7 @@ impl Client {
|
|||
///
|
||||
/// Note that in order for the request to be considered valid by most implementations, `key.owner`
|
||||
/// must equal `payload.actor`.
|
||||
#[instrument(skip_all, fields(activity = payload.id, url = inbox, key = key.id))]
|
||||
#[instrument(target = "fetch.delivery", skip_all, fields(activity = payload.id, url = inbox, key = key.id))]
|
||||
pub async fn deliver(&self, key: &SigningKey, payload: &Activity, inbox: &str) {
|
||||
let system = Subsystem::Delivery;
|
||||
|
||||
|
@ -60,7 +60,7 @@ impl Client {
|
|||
self.inner.execute(request).await.unwrap();
|
||||
}
|
||||
/// A high-level function to resolve a single ActivityPub ID using a signed request.
|
||||
#[instrument(skip_all, fields(url = url, key = key.id))]
|
||||
#[instrument(target = "fetch.resolver", skip_all, fields(url = url, key = key.id))]
|
||||
pub async fn resolve(&self, key: &SigningKey, url: &str) -> Result<Value, FetchError> {
|
||||
let system = Subsystem::Resolver;
|
||||
|
||||
|
@ -70,19 +70,25 @@ impl Client {
|
|||
.body(())
|
||||
.unwrap();
|
||||
|
||||
key.sign(Options::LEGACY, &req)
|
||||
.map_err(FetchError::Sig)?
|
||||
.commit(&mut req);
|
||||
match key.sign(Options::LEGACY, &req) {
|
||||
Ok(signature) => signature.commit(&mut req),
|
||||
Err(error) => {
|
||||
// This shouldn't happen during normal operation
|
||||
warn!("failed to sign request: {error}");
|
||||
return Err(FetchError::Sig(error));
|
||||
}
|
||||
}
|
||||
|
||||
let request = req.map(|()| Body::default()).try_into()?;
|
||||
let response = self.inner.execute(request).await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
debug!("resolution successful");
|
||||
response.json().await.map_err(From::from)
|
||||
} else {
|
||||
let status = response.status().as_u16();
|
||||
let body = response.text().await?;
|
||||
info!(status, "resolution failed: {body}");
|
||||
debug!(status, "resolution failed: {body}");
|
||||
Err(FetchError::NotSuccess {
|
||||
url: url.to_string(),
|
||||
status,
|
||||
|
@ -93,7 +99,7 @@ impl Client {
|
|||
/// Forwards a request and returns the raw response, so that it can be analyzed for debugging.
|
||||
///
|
||||
/// It exists solely as a debugging tool!
|
||||
#[instrument(skip_all, fields(url, key = key.id))]
|
||||
#[instrument(target = "fetch.devproxy", skip_all, fields(url, key = key.id))]
|
||||
pub async fn proxy(
|
||||
&self,
|
||||
key: &SigningKey,
|
||||
|
@ -111,7 +117,8 @@ impl Client {
|
|||
.expect("signing error")
|
||||
.commit(&mut req);
|
||||
|
||||
let resp = reqwest::Client::new()
|
||||
let resp = self
|
||||
.inner
|
||||
.execute(req.map(|_| Body::default()).try_into().unwrap())
|
||||
.await?;
|
||||
|
||||
|
@ -146,11 +153,16 @@ enum Subsystem {
|
|||
impl Subsystem {
|
||||
/// Get the user agent string for the subsystem.
|
||||
fn user_agent(&self) -> String {
|
||||
format!("{SOFTWARE}/{VERSION} [{}]", match self {
|
||||
format!("{SOFTWARE}/{VERSION} [{}]", self.as_str())
|
||||
}
|
||||
/// Get a str representation of this subsystem.
|
||||
#[inline]
|
||||
const fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Subsystem::Resolver => "resolver",
|
||||
Subsystem::Delivery => "delivery",
|
||||
Subsystem::DevProxy => "devproxy",
|
||||
})
|
||||
}
|
||||
}
|
||||
/// Construct a new request for this subsystem.
|
||||
///
|
||||
|
@ -177,7 +189,11 @@ impl Subsystem {
|
|||
|
||||
let Some(host) = uri.host() else {
|
||||
// SECURITY: Refuse to resolve URLs to local resources using local keys.
|
||||
panic!("refusing to resolve a relative URL: {target}")
|
||||
error!(target: "security", "refusing to resolve a relative URL: {target}");
|
||||
return Err(FetchError::InvalidURI {
|
||||
url: target.to_string(),
|
||||
error: "Relative URI".to_string(),
|
||||
});
|
||||
};
|
||||
|
||||
let req = http::Request::builder()
|
||||
|
|
|
@ -33,6 +33,7 @@ pub mod post;
|
|||
mod interact;
|
||||
|
||||
use derive_more::{From, Display};
|
||||
use tracing::{error, instrument, warn};
|
||||
|
||||
/// Retrieve an ActivityPub object from the database.
|
||||
///
|
||||
|
@ -224,10 +225,10 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
#[derive(From, Debug, Display)]
|
||||
pub enum Error {
|
||||
/// An error internal to the store.
|
||||
#[display(fmt = "{}", self.0)]
|
||||
#[display(fmt = "store error: {}", self.0)]
|
||||
Store(StoreError),
|
||||
/// An error generated by the [fetch] subsystem.
|
||||
#[display(fmt = "{}", self.0)]
|
||||
#[display(fmt = "fetch error: {}", self.0)]
|
||||
Fetch(FetchError),
|
||||
/// Expected `node` to have some property that it doesn't have.
|
||||
#[display(fmt = "missing data: {node} is missing {prop}")]
|
||||
|
@ -237,6 +238,8 @@ pub enum Error {
|
|||
/// Name of the thing it is missing.
|
||||
prop: &'static str,
|
||||
},
|
||||
#[display(fmt = "invalid data: {}", self.0)]
|
||||
Invalid(String),
|
||||
}
|
||||
|
||||
pub mod config {
|
||||
|
@ -252,7 +255,7 @@ pub mod config {
|
|||
/// Interpret an *incoming* activity. Outgoing activities are *never* interpreted through this function,
|
||||
/// because their changes are already in the database.
|
||||
// TODO: figure out if that is the behavior we actually want
|
||||
#[tracing::instrument(skip(cx))]
|
||||
#[instrument(skip_all, fields(activity.id = activity.id))]
|
||||
pub fn interpret(cx: &Context, activity: Activity) -> Result<()> {
|
||||
// Fetch our actor from the database
|
||||
let Some(actor) = cx.store().lookup(Id(activity.actor.clone()))? else {
|
||||
|
@ -313,7 +316,10 @@ pub fn interpret(cx: &Context, activity: Activity) -> Result<()> {
|
|||
};
|
||||
(Key::gen(), tag)
|
||||
}
|
||||
k => panic!("unsupported activity type {k} (id={})", activity.id),
|
||||
k => {
|
||||
warn!(activity.id, "unsupported activity type {k}");
|
||||
return Err(Error::Invalid(format!("activity type '{k}' not supported")));
|
||||
}
|
||||
};
|
||||
cx.run(|tx| {
|
||||
tx.add_alias(key, Id(activity.id.clone()))?;
|
||||
|
@ -327,7 +333,7 @@ pub fn interpret(cx: &Context, activity: Activity) -> Result<()> {
|
|||
}
|
||||
|
||||
/// Make sure all the interesting bits of an activity are here.
|
||||
#[tracing::instrument(skip(cx))]
|
||||
#[instrument(skip_all, fields(activity.id = activity.id, key = auth.to_string()))]
|
||||
pub async fn ingest(cx: &Context, auth: Key, activity: &Activity) -> Result<()> {
|
||||
let key = cx.run(|tx| get_signing_key(tx, actor::Actor { key: auth }).map_err(Error::Store))?;
|
||||
for id in [activity.actor.as_str(), activity.object.id()] {
|
||||
|
|
Loading…
Reference in a new issue