Compare commits

...

3 Commits
master ... dev

Author SHA1 Message Date
Magic_RB a02d58d50b
even more stuff 1 year ago
Magic_RB 6579f0a331
More stuff 1 year ago
Magic_RB 338033b11a
Stuff 1 year ago
  1. 1
      flake.nix
  2. 45
      src/actors/upload_consumer.rs
  3. 30
      src/actors/upload_dispatcher.rs
  4. 58
      src/api/mod.rs
  5. 13
      src/main.rs
  6. 32
      src/messages.rs
  7. 3
      src/types/mod.rs
  8. 49
      src/types/upload_id.rs
  9. 11
      test.bash

1
flake.nix

@ -17,6 +17,7 @@
nativeBuildInputs = with pkgs; [
sqlite.dev
diesel-cli
jq
];
}
);

45
src/actors/upload_consumer.rs

@ -1,25 +1,54 @@
use actix::{Actor, Addr, Context, Handler, Response};
use crate::messages::UploadNar;
use crate::{messages::{BeginWritePath, FinalizeWritePath}, types::{NarInfo, Path, StorePath}};
pub struct UploadConsumer {}
pub struct PartialNarInfo {
store_path: StorePath,
nar_hash: String,
nar_size: u32,
references: Vec<Path>,
deriver: Path,
sig: Vec<String>,
}
// TODO add checks perhaps?
impl From<BeginWritePath> for PartialNarInfo {
fn from(b: BeginWritePath) -> Self {
Self {
store_path: b.store_path,
nar_hash: b.nar_hash,
nar_size: b.nar_size,
references: b.references,
deriver: b.deriver,
sig: b.sig,
}
}
}
pub struct UploadConsumer {
partial_narinfo: PartialNarInfo,
}
impl Actor for UploadConsumer {
type Context = Context<Self>;
}
impl UploadConsumer {
pub fn new() -> Addr<Self> {
pub fn new(begin_write_path: BeginWritePath) -> Addr<Self> {
Self::create(|ctx| {
Self {}
Self {
partial_narinfo: begin_write_path.into()
}
})
}
}
impl Handler<UploadNar> for UploadConsumer {
type Result = ();
impl Handler<FinalizeWritePath> for UploadConsumer {
type Result = Response<NarInfo>;
fn handle(&mut self, msg: UploadNar, ctx: &mut Self::Context) -> Self::Result {
()
fn handle(&mut self, msg: FinalizeWritePath, ctx: &mut Self::Context) -> Self::Result {
println!("{}", msg.id);
todo!()
}
}

30
src/actors/upload_dispatcher.rs

@ -1,13 +1,13 @@
use std::collections::HashMap;
use actix::{Actor, Addr, Context, Handler, Recipient, Response};
use actix::{Actor, Addr, Context, Handler, Recipient, Response, ResponseFuture};
use crate::{messages::{BeginUpload, UploadNar}, types::UploadId};
use crate::{messages::{BeginWritePath, FinalizeWritePath}, types::{NarInfo, UploadId}};
use super::upload_consumer::UploadConsumer;
pub struct UploadDispatcher {
map: HashMap<UploadId, Recipient<UploadNar>>
map: HashMap<UploadId, Recipient<FinalizeWritePath>>
}
impl Actor for UploadDispatcher {
@ -24,14 +24,32 @@ impl UploadDispatcher {
}
}
impl Handler<BeginUpload> for UploadDispatcher {
impl Handler<BeginWritePath> for UploadDispatcher {
type Result = Response<UploadId>;
fn handle(&mut self, msg: BeginUpload, ctx: &mut Self::Context) -> Self::Result {
fn handle(&mut self, msg: BeginWritePath, ctx: &mut Self::Context) -> Self::Result {
let upload_id = UploadId::new();
self.map.insert(upload_id.clone(), UploadConsumer::new().recipient());
self.map.insert(upload_id.clone(), UploadConsumer::new(msg).recipient());
Response::reply(upload_id)
}
}
impl Handler<FinalizeWritePath> for UploadDispatcher {
type Result = ResponseFuture<NarInfo>;
fn handle(&mut self, msg: FinalizeWritePath, ctx: &mut Self::Context) -> Self::Result {
let addr = self.map.get(&msg.id).map(|r| r.to_owned());
Box::pin(async move {
match addr {
Some(s) => match s.send(msg).await {
Ok(narinfo) => narinfo,
Err(e) => todo!()
},
None => todo!(),
}
})
}
}

58
src/api/mod.rs

@ -1,7 +1,7 @@
pub mod upload {
use crate::{messages::BeginUpload, types::{Path, StorePath}};
use crate::{messages::BeginWritePath, types::{Path, StorePath}};
use actix::Recipient;
use actix_web::{HttpResponse, post, web::{Data, Json}};
use actix_web::{HttpResponse, post, web::{self, Data, Json, ReqData}};
use serde::{Deserialize, Serialize};
#[derive(Deserialize)]
@ -14,7 +14,21 @@ pub mod upload {
sig: Vec<String>
}
impl Into<BeginWritePath> for Request {
fn into(self) -> BeginWritePath {
BeginWritePath {
store_path: self.store_path,
nar_hash: self.nar_hash,
nar_size: self.nar_size,
references: self.references,
deriver: self.deriver,
sig: self.sig,
}
}
}
#[derive(Serialize)]
#[serde(untagged)]
pub enum Response {
Ok {
upload_path: String,
@ -23,12 +37,48 @@ pub mod upload {
}
#[post("api_v1/upload")]
pub async fn endpoint(req: Json<Request>, begin_upload: Data<Recipient<BeginUpload>>) -> HttpResponse {
pub async fn endpoint(req: web::Json<Request>, addr: web::Data<Recipient<BeginWritePath>>) -> HttpResponse {
let req = req.into_inner();
match begin_upload.send(BeginUpload {}).await {
match addr.send(req.into()).await {
Ok(o) => HttpResponse::Ok().json(Response::Ok { upload_path: o.to_string() }),
Err(e) => HttpResponse::Conflict().json(Response::AlreadyExists),
}
}
}
pub mod upload_nar {
use std::path::PathBuf;
use actix::Recipient;
use actix_web::{HttpResponse, post, web};
use serde::Serialize;
use crate::{messages::FinalizeWritePath, types::{NarInfo, UploadId}};
#[derive(Serialize)]
#[serde(untagged)]
pub enum Response {
Ok {
#[serde(flatten)]
narinfo: NarInfo
},
Error
}
#[post("api_v1/upload/{upload_id}")]
pub async fn endpoint(nar: web::Payload,
path: web::Path<UploadId>,
addr: web::Data<Recipient<FinalizeWritePath>>) -> HttpResponse {
let upload_id = path.into_inner();
let finalize = FinalizeWritePath {
id: upload_id,
file: PathBuf::new(),
};
match addr.send(finalize).await {
Ok(o) => HttpResponse::Ok().json(Response::Ok { narinfo: o }),
Err(e) => HttpResponse::InternalServerError().json(Response::Error)
}
}
}

13
src/main.rs

@ -2,9 +2,9 @@
extern crate diesel;
use actix::Recipient;
use actix_web::{HttpResponse, web};
use actix_web::web;
use diesel::{EqAll, QueryDsl, RunQueryDsl, connection::Connection, sqlite::SqliteConnection};
use std::{convert::{TryFrom, TryInto}, error::Error, path::{Path as StdPath, PathBuf}};
use std::{convert::{TryFrom, TryInto}, error::Error, path::PathBuf};
mod schema;
mod api;
@ -14,7 +14,7 @@ mod actors;
use schema::paths;
use crate::messages::BeginUpload;
use crate::messages::{BeginWritePath, FinalizeWritePath};
#[derive(Debug)]
struct Path {
@ -150,12 +150,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
let upload_dispatcher = actors::upload_dispatcher::UploadDispatcher::new();
{
let upload_dispatcher = web::Data::new(upload_dispatcher.clone().recipient());
let finalize_write_path: Recipient<FinalizeWritePath> = upload_dispatcher.clone().recipient();
let begin_write_path: Recipient<BeginWritePath> = upload_dispatcher.clone().recipient();
actix_web::HttpServer::new(move || {
actix_web::App::new()
.service(api::upload::endpoint)
.app_data(upload_dispatcher.clone())
.service(api::upload_nar::endpoint)
.app_data(web::Data::new(finalize_write_path.clone()))
.app_data(web::Data::new(begin_write_path.clone()))
}).bind("localhost:8099").map(|server| server.run())?.await?;
}

32
src/messages.rs

@ -1,24 +1,30 @@
use std::path::PathBuf;
use actix::Message;
use crate::types::{Path, UploadId, StorePath};
use crate::types::{NarInfo, Path, StorePath, UploadId};
#[derive(Message)]
#[rtype(result = "()")]
pub struct UploadNar {}
#[rtype(result = "NarInfo")]
pub struct FinalizeWritePath {
pub id: UploadId,
pub file: PathBuf,
}
#[derive(Message)]
#[rtype(result = "UploadId")]
pub struct BeginUpload {}
pub struct BeginWritePath {
pub store_path: StorePath,
pub nar_hash: String,
pub nar_size: u32,
pub references: Vec<Path>,
pub deriver: Path,
pub sig: Vec<String>,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct WritePath {
store_path: StorePath,
nar_hash: String,
nar_size: u32,
references: Vec<Path>,
deriver: StorePath,
sig: Vec<String>,
pub struct UploadFinalized {
pub file: PathBuf,
pub narinfo: NarInfo
}

3
src/types/mod.rs

@ -10,6 +10,9 @@ pub use hash::Hash;
mod upload_id;
pub use upload_id::UploadId;
mod narinfo;
pub use narinfo::NarInfo;
#[cfg(test)]
mod test {
use super::*;

49
src/types/upload_id.rs

@ -1,6 +1,7 @@
use std::{convert::TryInto, fmt, fmt::Display};
use rand::{Rng, distributions::Uniform, thread_rng};
use serde::{Deserialize, Deserializer, Serialize, de::Visitor};
#[derive(PartialEq, Eq, Hash, Clone)]
pub struct UploadId {
@ -29,6 +30,54 @@ impl fmt::Debug for UploadId {
}
}
impl Serialize for UploadId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer {
serializer.serialize_str(self.to_string().as_str())
}
}
struct UploadIdVisitor;
impl<'de> Visitor<'de> for UploadIdVisitor {
type Value = UploadId;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a base36 encoded value separated into 5 groups by 5 symbols, delimited by `-`")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Self::Value::parse(v).ok_or(E::custom("Invalid upload id"))
}
fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
self.visit_str(v)
}
fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
self.visit_str(&v)
}
}
impl<'de> Deserialize<'de> for UploadId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de> {
deserializer.deserialize_str(UploadIdVisitor)
}
}
impl UploadId {
pub fn new() -> Self {
let rng = thread_rng();

11
test.bash

@ -1,6 +1,6 @@
#! /bin/bash
curl -d \
NARINFO=\
'{
"store_path": "/nix/store/dzyimsdk9yq7x6g24r79ipg3vbalyyy1-libidn2-2.3.1",
"nar_hash": "sha256:1rizfnla4lyjls0d6dpf195r5xm6mz1z34xg64pnirrdrlsqrksa",
@ -13,4 +13,11 @@ curl -d \
"cache.nixos.org-1:LuHqfckGdiPXBgpc1KYl49TCqHBjg85lFuJGb8UL93Z7OMc2Tl2+8MC081CWZ2lBx4ZkN0rc1jT21uInH0rlBw=="
],
"deriver": "vvikw51p1mrdw7lkqnnj16ha3612vp18-libidn2-2.3.1.drv"
}' -H 'Content-Type: application/json' http://localhost:8099/api_v1/upload
}'
UPLOAD_ID=$(curl -d "${NARINFO}" \
-H 'Content-Type: application/json' http://localhost:8099/api_v1/upload |\
jq -r '.upload_path')
printf '\n'
curl -X POST "http://localhost:8099/api_v1/upload/${UPLOAD_ID}"

Loading…
Cancel
Save