Initial seemingly WORKING commit!

This commit is contained in:
Olivier 'reivilibre' 2021-11-07 19:39:13 +00:00
commit baf80db238
7 changed files with 3423 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
/target
/.idea
/.env

2793
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

20
Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "mxmonzo"
version = "0.0.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
matrix-sdk = { version = "0.4.1", features = [ "encryption", "sled_cryptostore" ] }
#monzo-lib = "0.4.0"
monzo-lib = { git = "https://github.com/danieleades/monzo-lib.git", rev = "e54ff827" }
tokio = { version = "1.13.0", features = [ "full" ] }
anyhow = "1.0.45"
warp = "0.3.1"
url = "2.2.2"
serde = { version = "1.0.130", features = [ "derive" ] }
serde_json = "1.0.69"
reqwest = { version = "0.11", features = [ "json" ] }
envy = "0.4"

139
src/main.rs Normal file
View File

@ -0,0 +1,139 @@
use crate::monzo::monzo_client_freshened;
use crate::state::{Config, MonzoState, State, StateInner};
use crate::web::warp_main;
use matrix_sdk::room::Room;
use matrix_sdk::ruma::events::room::message::{MessageEventContent, MessageType};
use matrix_sdk::ruma::events::SyncMessageEvent;
use matrix_sdk::ruma::UserId;
use matrix_sdk::{Client, ClientConfig, SyncSettings};
use std::convert::TryFrom;
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::sync::RwLock;
pub mod monzo;
pub mod state;
pub mod web;
async fn on_room_message(
event: &SyncMessageEvent<MessageEventContent>,
room: Room,
state: &State,
) -> anyhow::Result<()> {
if let Room::Joined(room) = room {
if *room.room_id() != state.config.matrix_room {
return Ok(());
}
if let MessageType::Text(body) = &event.content.msgtype {
if body.body.as_str() == "?" {
// Respond with our balances.
if let Some(client) = monzo_client_freshened(state).await? {
let mut buf = String::new();
let state_inner = state.inner.read().await;
for (acc_id, acc_name) in
state_inner.monzo_state.as_ref().unwrap().accounts.iter()
{
let balance = client.balance(acc_id).await?;
if !buf.is_empty() {
buf.push('\n');
}
if &balance.currency == "GBP" {
let bal_quid = balance.balance / 100;
let bal_pennies = balance.balance % 100;
buf.push_str(&format!("{}: £{}.{:02}", acc_name, bal_quid, bal_pennies))
} else {
buf.push_str("?currency");
}
}
let content = MessageEventContent::text_plain(&buf);
room.send(content, None).await.unwrap();
} else {
let content = MessageEventContent::text_plain("Not linked :(.");
room.send(content, None).await.unwrap();
}
}
}
}
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Load the config from the environment!
let config: Config = envy::from_env()?;
let state_inner = StateInner::default();
let mxid = UserId::try_from(config.matrix_id.clone())?;
let store_path = config.matrix_store.clone();
let client_config = ClientConfig::new().store_path(store_path);
let client = Client::new_from_user_id_with_config(mxid.clone(), client_config).await?;
let state = State {
config: Arc::new(config.clone()),
inner: Arc::new(RwLock::new(state_inner)),
matrix_client: Arc::new(client.clone()),
};
eprintln!("Logging in!");
client
.login(
mxid.localpart(),
&config.matrix_password,
Some("mxmonzo"),
Some("rei's MxMonzo"),
)
.await?;
eprintln!("Syncing once!");
client.sync_once(SyncSettings::new()).await?;
if config.monzo_persist.exists() {
eprintln!("Loading found monzo persist data.");
let mut buf = Vec::new();
tokio::fs::File::open(&config.monzo_persist)
.await?
.read_to_end(&mut buf)
.await?;
let monzo_state: MonzoState = serde_json::from_slice(&buf)?;
let mut state_inner = state.inner.write().await;
state_inner.monzo_state = Some(monzo_state);
} else {
eprintln!("(Monzo persist file does not exist.)");
}
eprintln!("Starting warp.");
let _warp_task = tokio::task::spawn(warp_main(state.clone()));
let state_arc = Arc::new(state);
client
.register_event_handler(
move |ev: SyncMessageEvent<MessageEventContent>, room: Room| {
let state_arc = state_arc.clone();
async move {
if let Err(error) = on_room_message(&ev, room, &state_arc).await {
eprintln!("Error handling message {}: {:?}", ev.event_id, error);
}
}
},
)
.await;
// Syncing is important to synchronise the client state with the server.
// This method will never return.
client
.sync(
SyncSettings::default().token(
client
.sync_token()
.await
.expect("Just synced, should have a token."),
),
)
.await;
Ok(())
}

174
src/monzo.rs Normal file
View File

@ -0,0 +1,174 @@
use crate::state::{persist_monzo_state, MonzoState, State};
use monzo::inner_client::Quick;
use serde::Deserialize;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Deserialize)]
struct MonzoAuthResponse {
access_token: String,
expires_in: u64,
refresh_token: String,
#[allow(dead_code)]
client_id: String,
#[allow(dead_code)]
token_type: String,
#[allow(dead_code)]
user_id: String,
}
pub async fn monzo_client_from_code(
state: &State,
code: &str,
) -> anyhow::Result<monzo::Client<monzo::inner_client::Quick>> {
// obtain access & refresh token from Monzo
let client = reqwest::Client::new();
let auth_resp: MonzoAuthResponse = client
.post("https://api.monzo.com/oauth2/token")
.form(&[
("grant_type", "authorization_code"),
("client_id", &state.config.monzo_client_id),
("client_secret", &state.config.monzo_client_secret),
(
"redirect_uri",
&format!("{}/auth_done", state.config.base_uri),
),
("code", code),
])
.send()
.await?
.json()
.await?;
let mut state_inner = state.inner.write().await;
state_inner.monzo_state = Some(MonzoState {
access_token: auth_resp.access_token.clone(),
access_token_expiry: (SystemTime::now() + Duration::from_secs(auth_resp.expires_in))
.duration_since(UNIX_EPOCH)?
.as_secs(),
refresh_token: auth_resp.refresh_token,
accounts: Default::default(),
});
let client = monzo::Client::new(auth_resp.access_token);
drop(state_inner);
persist_monzo_state(state).await?;
Ok(client)
}
pub async fn monzo_client_freshened(
state: &State,
) -> anyhow::Result<Option<monzo::Client<monzo::inner_client::Quick>>> {
let state_inner = state.inner.read().await;
let monzo_state = match &state_inner.monzo_state {
None => {
return Ok(None);
}
Some(ms) => ms,
};
// Allow for 2 minute buffer.
let exp_at_instant = UNIX_EPOCH + Duration::from_secs(monzo_state.access_token_expiry - 120);
let refresh_token = monzo_state.refresh_token.clone();
drop(state_inner);
let needs_refresh = exp_at_instant < SystemTime::now();
if needs_refresh {
let client = reqwest::Client::new();
let auth_resp: MonzoAuthResponse = client
.post("https://api.monzo.com/oauth2/token")
.form(&[
("grant_type", "refresh_token"),
("client_id", &state.config.monzo_client_id),
("client_secret", &state.config.monzo_client_secret),
("refresh_token", &refresh_token),
])
.send()
.await?
.json()
.await?;
let mut state_inner = state.inner.write().await;
let monzo_state = state_inner.monzo_state.as_mut().unwrap();
monzo_state.access_token = auth_resp.access_token;
monzo_state.refresh_token = auth_resp.refresh_token;
monzo_state.access_token_expiry = (SystemTime::now()
+ Duration::from_secs(auth_resp.expires_in))
.duration_since(UNIX_EPOCH)?
.as_secs();
}
let state_inner = state.inner.read().await;
let monzo_state = state_inner.monzo_state.as_ref().unwrap();
let client = monzo::Client::new(monzo_state.access_token.clone());
if needs_refresh {
persist_monzo_state(state).await?;
}
Ok(Some(client))
}
#[derive(Deserialize)]
pub struct Webhook {
pub account_id: String,
pub id: String,
pub url: String,
}
#[derive(Deserialize)]
pub struct WebhookList {
pub webhooks: Vec<Webhook>,
}
pub async fn list_webhooks(
client: &monzo::Client<Quick>,
account_id: &str,
) -> anyhow::Result<Vec<Webhook>> {
let at = client.access_token();
let client = reqwest::Client::new();
let response: WebhookList = client
.get("https://api.monzo.com/webhooks")
.query(&[("account_id", account_id)])
.header("Authorization", format!("Bearer {}", at))
.send()
.await?
.json()
.await?;
Ok(response.webhooks)
}
pub async fn delete_webhook(client: &monzo::Client<Quick>, webhook_id: &str) -> anyhow::Result<()> {
let at = client.access_token();
let client = reqwest::Client::new();
if !client
.delete(format!("https://api.monzo.com/webhooks/{}", webhook_id))
.header("Authorization", format!("Bearer {}", at))
.send()
.await?
.status()
.is_success()
{
anyhow::bail!("Not successful: delete");
}
Ok(())
}
pub async fn create_webhook(
client: &monzo::Client<Quick>,
account_id: &str,
url: &str,
) -> anyhow::Result<()> {
let at = client.access_token();
let client = reqwest::Client::new();
if !client
.post("https://api.monzo.com/webhooks")
.header("Authorization", format!("Bearer {}", at))
.form(&[("account_id", account_id), ("url", url)])
.send()
.await?
.status()
.is_success()
{
anyhow::bail!("Not successful: register hook");
}
Ok(())
}

53
src/state.rs Normal file
View File

@ -0,0 +1,53 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::sync::RwLock;
#[derive(Clone)]
pub struct State {
pub config: Arc<Config>,
pub inner: Arc<RwLock<StateInner>>,
pub matrix_client: Arc<matrix_sdk::Client>,
}
#[derive(Clone, Deserialize)]
pub struct Config {
pub matrix_id: String,
pub matrix_room: String,
pub matrix_store: PathBuf,
pub matrix_password: String,
pub monzo_persist: PathBuf,
pub monzo_client_id: String,
pub monzo_client_secret: String,
pub base_uri: String,
}
#[derive(Default)]
pub struct StateInner {
pub monzo_state: Option<MonzoState>,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct MonzoState {
pub access_token: String,
pub access_token_expiry: u64,
pub refresh_token: String,
/// Account IDs to names
pub accounts: HashMap<String, String>,
}
pub async fn persist_monzo_state(state: &State) -> anyhow::Result<()> {
let monzo_state = state
.inner
.read()
.await
.monzo_state
.clone()
.ok_or_else(|| anyhow::anyhow!("No monzo state to persist"))?;
let mut file = tokio::fs::File::create(&state.config.monzo_persist).await?;
let json_data = serde_json::to_vec(&monzo_state)?;
file.write_all(&json_data).await?;
Ok(())
}

241
src/web.rs Normal file
View File

@ -0,0 +1,241 @@
use crate::monzo::{
create_webhook, delete_webhook, list_webhooks, monzo_client_freshened, monzo_client_from_code,
};
use crate::state::State;
use matrix_sdk::room::Room;
use matrix_sdk::ruma::events::room::message::MessageEventContent;
use matrix_sdk::ruma::RoomId;
use monzo::accounts::Type;
use serde::Deserialize;
use std::collections::HashMap;
use std::str::FromStr;
use url::Url;
use warp::http::{Response, StatusCode};
use warp::Filter;
fn wrap<T: warp::Reply + 'static>(
ok_or_err: anyhow::Result<T>,
) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
match ok_or_err {
Ok(inner) => Ok(Box::new(inner)),
Err(err) => {
eprintln!("Error handling request {:?}", err);
Ok(Box::new(warp::reply::with_status(
"An error has occurred.",
StatusCode::INTERNAL_SERVER_ERROR,
)))
}
}
}
async fn auth_setup(
/* host: String, forwarded_proto: Option<String>, */ state: State,
) -> Result<impl warp::Reply, warp::Rejection> {
// let final_redirect_uri = format!("{}://{}/auth_done", forwarded_proto.as_deref().unwrap_or("http"), host);
let final_redirect_uri = std::env::var("MONZO_REDIRECT_URI").unwrap();
let mut url = Url::parse("https://auth.monzo.com/").unwrap();
url.query_pairs_mut()
.append_pair("client_id", &state.config.monzo_client_id)
.append_pair("redirect_uri", &final_redirect_uri)
.append_pair("response_type", "code");
Ok(Response::builder()
.status(StatusCode::SEE_OTHER)
.header("Location", url.as_str())
.body("Redirecting to log in..."))
}
#[derive(Deserialize)]
struct AuthDone {
code: String,
}
async fn auth_done(query: AuthDone, state: State) -> anyhow::Result<impl warp::Reply> {
eprintln!("code {}", query.code);
let _client = monzo_client_from_code(&state, &query.code).await?;
Ok("Success! Please authorise in Monzo, then click <a href='/auth_confirmed'>here</a>.")
}
async fn auth_done_wrapped(
query: AuthDone,
state: State,
) -> Result<impl warp::Reply, warp::Rejection> {
wrap(auth_done(query, state).await)
}
async fn auth_confirmed(state: State) -> anyhow::Result<impl warp::Reply> {
let client = monzo_client_freshened(&state)
.await?
.ok_or_else(|| anyhow::anyhow!("Monzo client has not been set up yet."))?;
let accounts = client.accounts().await?;
let mut account_names = HashMap::new();
for account in accounts {
let acc_type = match account.account_type {
Type::UkBusiness => "business",
Type::UkRetail => "personal",
Type::UkRetailJoint => "joint",
_ => "?type",
};
// create/overwrite webhooks
for hook in list_webhooks(&client, &account.id).await? {
delete_webhook(&client, &hook.id).await?;
}
create_webhook(
&client,
&account.id,
&format!("{}/hook", state.config.base_uri),
)
.await?;
account_names.insert(account.id, acc_type.to_string());
}
let mut state_inner = state.inner.write().await;
state_inner.monzo_state.as_mut().unwrap().accounts = account_names;
Ok("Success!")
}
async fn auth_confirmed_wrapped(state: State) -> Result<impl warp::Reply, warp::Rejection> {
wrap(auth_confirmed(state).await)
}
#[derive(Deserialize)]
struct MonzoHook {
#[serde(rename = "type")]
kind: String,
#[serde(rename = "data")]
payload: serde_json::Value,
}
#[derive(Deserialize)]
struct MonzoTransactionCreated {
amount: i64,
merchant: Option<Merchant>,
account_id: String,
counterparty: Option<Counterparty>,
}
#[derive(Deserialize)]
struct Merchant {
name: String,
}
#[derive(Deserialize)]
struct Counterparty {
name: Option<String>,
}
async fn monzo_hook(hook: MonzoHook, state: State) -> anyhow::Result<impl warp::Reply> {
if &hook.kind == "transaction.created" {
eprintln!("TODO transaction hook (pre-processed)");
eprintln!("---\n{:#?}\n---", hook.payload);
let txn_created: MonzoTransactionCreated = serde_json::from_value(hook.payload)?;
eprintln!("TODO transaction hook.");
let room = state
.matrix_client
.get_room(&RoomId::from_str(&state.config.matrix_room)?)
.ok_or_else(|| anyhow::anyhow!("Matrix room not found!"))?;
if let Room::Joined(room) = room {
let mut buf = String::new();
// +
if txn_created.amount < 0 {
buf.push('');
} else {
buf.push('+');
}
let quid = txn_created.amount.abs() / 100;
let pennies = txn_created.amount.abs() % 100;
buf.push_str(&format!("£{}.{:02} (", quid, pennies));
if let Some(merchant) = txn_created.merchant {
buf.push_str(&merchant.name);
} else if let Some(counterparty) = txn_created.counterparty {
if let Some(name) = counterparty.name {
buf.push_str(&name);
} else {
buf.push_str("?c?");
}
} else {
buf.push_str("???");
}
buf.push(')');
let state_inner = state.inner.read().await;
let on_account_name = state_inner
.monzo_state
.as_ref()
.map(|x| x.accounts.get(&txn_created.account_id))
.flatten()
.map(String::as_str)
.unwrap_or("unk");
buf.push_str(&format!(" on {}", on_account_name));
if let Some(monzo_client) = monzo_client_freshened(&state).await? {
// include new balance
let new_bal = monzo_client.balance(&txn_created.account_id).await?.balance;
let now_quid = new_bal / 100;
let now_pennies = new_bal % 100;
buf.push_str(&format!(" now £{}.{:02}", now_quid, now_pennies));
}
let content = MessageEventContent::text_plain(&buf);
room.send(content, None).await.unwrap();
} else {
eprintln!("Not in room.");
}
} else {
eprintln!("Unknown webhook kind: {}", hook.kind);
}
Ok("")
}
async fn monzo_hook_wrapped(
body: MonzoHook,
state: State,
) -> Result<impl warp::Reply, warp::Rejection> {
wrap(monzo_hook(body, state).await)
}
/// Entry point for the warp-based HTTP server component.
/// What it's for:
/// - setting up Monzo access tokens
/// - receiving Monzo webhooks
pub async fn warp_main(state: State) {
let with_state = warp::any().map(move || state.clone());
let auth_setup_route = warp::get()
.and(warp::path("auth_setup"))
// .and(warp::header("Host"))
// .and(warp::header::optional("X-Forwarded-Proto"))
.and(with_state.clone())
.and_then(auth_setup);
let auth_done_route = warp::get()
.and(warp::path("auth_done"))
.and(warp::query())
.and(with_state.clone())
.and_then(auth_done_wrapped);
let auth_confirmed_route = warp::get()
.and(warp::path("auth_confirmed"))
.and(with_state.clone())
.and_then(auth_confirmed_wrapped);
let hook_route = warp::post()
.and(warp::path("hook"))
.and(warp::body::json())
.and(with_state.clone())
.and_then(monzo_hook_wrapped);
let routes = auth_setup_route
.or(auth_done_route)
.or(auth_confirmed_route)
.or(hook_route);
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}