Browse Source

Worker rework and some crypto stuff

pull/2/head
Magic_RB 1 year ago
parent
commit
9b9e6c4350
  1. 76
      benches/worker.rs
  2. 6
      crypto/src/nonce.rs
  3. 2
      crypto/src/private_key.rs
  4. BIN
      images/graph.png
  5. BIN
      images/network_icon.svg
  6. 11
      src/lib.rs
  7. 4
      src/main.rs
  8. 26
      src/netstar/mod.rs
  9. 21
      src/netstar/packet/mod.rs
  10. 14
      src/netstar/packet/parser.rs
  11. 128
      src/netstar/worker.rs

76
benches/worker.rs

@ -0,0 +1,76 @@
use std::{convert::TryInto, time::Instant};
use actix::SyncArbiter;
use bytes::BytesMut;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use crypto::{SharedKey, XChaCha20Poly1305};
use rand::Rng;
use netstar::{Encrypt, Packet, Worker};
pub fn worker_benchmark(c: &mut Criterion) {
let sizes = [256, 512, 1024];
let threads = [1, 2, 4, 8, 12];
let mut group = c.benchmark_group("encrypt");
let nested = sizes
.iter()
.map(|size| threads.iter().map(move |thread| (size, thread)))
.flatten();
for (size, thread) in nested {
group.throughput(Throughput::Bytes(*size as u64));
group.bench_with_input(
BenchmarkId::from_parameter(format!("size-{}-thread-{}", size, thread)),
&(size, thread),
|b, &(size, thread)| {
let mut rng = rand::thread_rng();
let key = unsafe {
std::mem::transmute::<_, SharedKey>(*b"1d4f47r4f2asd514sdasd75882';'][]")
};
let xaed = XChaCha20Poly1305::new(&key);
let actix_rt = actix_rt::System::new("actix");
let addr = SyncArbiter::start(*thread, || Worker::new());
b.iter_custom(|iters| {
let data = (0..iters)
.map(|_| (0..*size).map(|_| rng.gen::<u8>()).collect::<BytesMut>())
.map(|plaintext| Packet {
id_hash: b"5441asdjfjgllkalsdj;l24224aasd7f".try_into().unwrap(),
data: netstar::PacketType::Data {
nonce: b"4;af;pasd'[p;]12asd42asd".try_into().unwrap(),
buffer: plaintext,
},
})
.zip((0..iters).map(|_| (0..*size + 16).map(|_| 0u8).collect::<BytesMut>()))
.collect::<Vec<(Packet, BytesMut)>>();
let mut collector = Vec::with_capacity(iters as usize);
let start = Instant::now();
actix_rt.block_on(async {
for (packet, ciphertext) in data {
collector.push(addr.send(Encrypt {
packet,
xaed: xaed.clone(),
buffer: ciphertext,
}));
}
for request in collector {
request.await.unwrap();
}
});
start.elapsed()
});
},
);
}
}
criterion_group!(benches, worker_benchmark);
criterion_main!(benches);

6
crypto/src/nonce.rs

@ -19,6 +19,12 @@ impl Nonce {
}
}
impl From<&[u8; 24]> for Nonce {
fn from(array: &[u8; 24]) -> Self {
Self(*array)
}
}
impl From<[u8; 24]> for Nonce {
fn from(array: [u8; 24]) -> Self {
Self(array)

2
crypto/src/private_key.rs

@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
pub struct PrivateKey(pub(crate) [u8; 32]);
impl PrivateKey {
pub fn new<R: rand::Rng + rand::CryptoRng>(rng: &mut R) -> Self {
pub fn new<R: rand::RngCore + rand::CryptoRng>(rng: &mut R) -> Self {
Self(x25519_dalek::StaticSecret::new(rng).to_bytes().clone())
}

BIN
images/graph.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

BIN
images/network_icon.svg

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

11
src/lib.rs

@ -0,0 +1,11 @@
mod config;
pub use config::Config;
mod error;
pub use error::Error;
mod netstar;
pub use netstar::{
worker::{Decrypt, Encrypt, Worker},
Netstar, Packet, PacketType,
};

4
src/main.rs

@ -5,13 +5,13 @@ mod error;
pub use error::Error;
mod netstar;
pub use netstar::Netstar;
pub use crate::netstar::Netstar;
#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let netstar = Netstar::new().await;
actix_rt::Arbiter::local_join().await;
actix_rt::Arbiter::new();
Ok(())
}

26
src/netstar/mod.rs

@ -3,16 +3,16 @@ use actix::{Actor, Addr, SyncArbiter};
use crate::config::Config;
mod packet;
pub use packet::Packet;
pub use packet::{Packet, PacketType};
mod packet_handler;
pub use packet_handler::PacketHandler;
mod network_gateway;
pub use network_gateway::NetworkGateway;
mod worker;
pub use worker::Worker;
pub mod worker;
use worker::Worker;
mod work_consumer;
pub use work_consumer::WorkConsumer;
mod work_orchestrator;
pub use work_orchestrator::WorkConsumer;
pub mod central_store;
@ -22,24 +22,26 @@ pub struct Netstar {
central_store: Addr<central_store::CentralStore>,
work_consumer: Addr<WorkConsumer>,
worker: Addr<Worker>,
packet_consumer: Addr<PacketHandler>,
packet_consumer: Addr<NetworkGateway>,
}
impl Netstar {
pub async fn new() -> Result<Self, crate::Error> {
let config = Config::default();
let central_store = central_store::CentralStore::new(crypto::PrivateKey::new(&mut rand::thread_rng())).start();
let central_store =
central_store::CentralStore::new(crypto::PrivateKey::new(&mut rand::thread_rng()))
.start();
let work_consumer = WorkConsumer::new().start();
let worker = {
let work_consumer = work_consumer.clone();
SyncArbiter::start(8, move || Worker::new(work_consumer.clone()))
SyncArbiter::start(8, move || Worker::new())
};
let packet_consumer = PacketHandler::new(
let packet_consumer = NetworkGateway::new(
config.connection_address,
worker.clone(),
central_store.clone(),
)
.await?;
.await?;
Ok(Self {
central_store,

21
src/netstar/packet/mod.rs

@ -1,4 +1,4 @@
use bytes::BufMut;
use bytes::{BufMut, BytesMut};
use crypto::{EphemeralBlob, IdHash, Nonce, PublicKey};
use std::mem::{size_of, size_of_val};
use tokio_util::codec::{Decoder, Encoder};
@ -49,7 +49,7 @@ pub enum PacketType {
},
Data {
nonce: Nonce,
ciphertext: Vec<u8>, // @TODO I still don't like this
buffer: BytesMut, // @TODO I still don't like this
},
}
@ -78,7 +78,11 @@ impl Packet {
pub fn deserialize(bytes: &[u8]) -> Result<Self, crate::Error> {
match parser::packet(bytes) {
Ok((_, packet)) => Ok(packet),
Err(err) => Err(crate::Error::Nom(err.to_owned())),
Err(err) => {
Err(crate::Error::Nom(err.map(|err| {
nom::error::Error::new(err.input.into(), err.code)
})))
}
}
}
@ -104,9 +108,9 @@ impl Packet {
PacketType::Capabilities { .. } => {
acc += size_of::<u32>();
}
PacketType::Data { ref ciphertext, .. } => {
PacketType::Data { ref buffer, .. } => {
acc += size_of::<Nonce>();
acc += size_of_val(&ciphertext[..]);
acc += size_of_val(&buffer[..]);
}
}
@ -144,12 +148,9 @@ impl Packet {
PacketType::Capabilities { capabilities } => {
dst.put_u32(capabilities);
}
PacketType::Data {
nonce,
ref ciphertext,
} => {
PacketType::Data { nonce, ref buffer } => {
dst.put_slice(nonce.as_ref());
dst.put_slice(&ciphertext[..])
dst.put_slice(&buffer[..])
}
}

14
src/netstar/packet/parser.rs

@ -100,9 +100,19 @@ fn parse_packet_capabilities(input: &[u8]) -> IResult<&[u8], PacketType> {
fn parse_packet_data(input: &[u8]) -> IResult<&[u8], PacketType> {
let (input, nonce) = parse_nonce(input)?;
let (input, length) = le_u16(input)?;
let (input, ciphertext) = map(take(length), |slice: &[u8]| slice.to_owned())(input)?;
let (input, ciphertext) = map(take(length), |slice: &[u8]| {
let mut bytes = BytesMut::new();
bytes.extend_from_slice(slice);
bytes
})(input)?;
Ok((input, PacketType::Data { nonce, ciphertext }))
Ok((
input,
PacketType::Data {
nonce,
buffer: ciphertext,
},
))
}
pub fn packet(input: &[u8]) -> IResult<&[u8], Packet> {

128
src/netstar/worker.rs

@ -1,50 +1,136 @@
use actix::{Actor, Addr, Handler, Message, SyncContext};
use bytes::{buf::BufMutExt, BytesMut};
use std::net::SocketAddr;
use bytes::BytesMut;
use crypto::XChaCha20Poly1305;
use super::{Packet, WorkConsumer};
use super::{packet::PacketType, Packet, WorkConsumer};
pub struct Worker {
work_handler: Addr<WorkConsumer>,
}
pub struct Worker {}
impl Actor for Worker {
type Context = SyncContext<Self>;
}
impl Handler<Encrypt> for Worker {
type Result = Result<(), crate::Error>;
type Result = Result<(Packet, BytesMut), crate::Error>;
fn handle(&mut self, msg: Encrypt, ctx: &mut Self::Context) -> Self::Result {
todo!()
fn handle(&mut self, mut msg: Encrypt, ctx: &mut Self::Context) -> Self::Result {
match msg.packet.data {
PacketType::Data { nonce, buffer } => {
msg.xaed
.encrypt(&nonce, buffer.as_ref(), msg.buffer.as_mut())
.map_err(|_| crate::Error::NoEndpoint)?; // FIXME will error, because the target BytesMut, might not be large enough
Ok((
Packet {
id_hash: msg.packet.id_hash,
data: PacketType::Data {
nonce,
buffer: msg.buffer,
},
},
buffer,
))
}
_ => Ok((msg.packet, msg.buffer)),
}
}
}
impl Handler<Decrypt> for Worker {
type Result = Result<(), crate::Error>;
type Result = Result<(Packet, BytesMut), crate::Error>;
fn handle(&mut self, msg: Decrypt, ctx: &mut Self::Context) -> Self::Result {
todo!()
fn handle(&mut self, mut msg: Decrypt, ctx: &mut Self::Context) -> Self::Result {
match msg.packet.data {
PacketType::Data { nonce, buffer } => {
msg.xaed
.decrypt(&nonce, msg.buffer.as_mut(), buffer.as_ref())
.map_err(|_| crate::Error::NoEndpoint)?; // FIXME will error, because the target BytesMut, might not be large enough
Ok((
Packet {
id_hash: msg.packet.id_hash,
data: PacketType::Data {
nonce,
buffer: msg.buffer,
},
},
buffer,
))
}
_ => Ok((msg.packet, msg.buffer)),
}
}
}
impl Worker {
pub fn new(work_handler: Addr<WorkConsumer>) -> Self {
Self { work_handler }
pub fn new() -> Self {
Self {}
}
}
#[derive(Message)]
#[rtype(result = "Result<(), crate::Error>")]
#[rtype(result = "Result<(Packet, BytesMut), crate::Error>")]
pub struct Encrypt {
packet: Packet,
addr: SocketAddr,
plaintext: BytesMut,
pub packet: Packet,
pub xaed: XChaCha20Poly1305,
pub buffer: BytesMut,
}
#[derive(Message)]
#[rtype(result = "Result<(), crate::Error>")]
#[rtype(result = "Result<(Packet, BytesMut), crate::Error>")]
pub struct Decrypt {
packet: Packet,
addr: SocketAddr,
pub packet: Packet,
pub xaed: XChaCha20Poly1305,
pub buffer: BytesMut,
}
#[cfg(test)]
mod tests {
use std::{convert::TryInto, error::Error};
use actix::SyncArbiter;
use bytes::BytesMut;
use crypto::{SharedKey, XChaCha20Poly1305};
use crate::netstar::{packet::PacketType, Packet};
use super::{Decrypt, Encrypt, Worker};
#[actix_rt::test]
async fn roundtrip() -> Result<(), Box<dyn Error>> {
let addr = SyncArbiter::start(1, || Worker::new());
let message = "Hello, world";
let packet = Packet {
id_hash: b"5441asdjfjgllkalsdj;l24224aasd7f".try_into()?,
data: PacketType::Data {
nonce: b"4;af;pasd'[p;]12asd42asd".try_into()?,
buffer: BytesMut::from(message),
},
};
let key =
unsafe { std::mem::transmute::<_, SharedKey>(*b"1d4f47r4f2asd514sdasd75882';'][]") };
let xaed = XChaCha20Poly1305::new(&key);
let (packet, buffer): (Packet, BytesMut) = addr
.send(Encrypt {
packet,
xaed: xaed.clone(),
buffer: (0..message.len() + 16).map(|_| 0u8).collect::<BytesMut>(),
})
.await??;
let (packet, _): (Packet, BytesMut) = addr
.send(Decrypt {
packet,
xaed,
buffer,
})
.await??;
if let PacketType::Data { nonce: _, buffer } = packet.data {
assert_eq!(buffer.as_ref(), message.as_bytes());
}
Ok(())
}
}

Loading…
Cancel
Save