use std::collections::{BTreeMap, HashMap};
use std::time::Duration;
use actix::clock::sleep;
use actix::dev::*;
use log::info;
use tokio::sync::oneshot::channel;
use super::counter::Counter;
use crate::errors::*;
use crate::master::messages::{
AddSite, AddVisitor, GetInternalData, RemoveCaptcha, Rename, SetInternalData,
};
use crate::master::Master as MasterTrait;
use super::counter::{GetCurrentVisitorCount, Stop};
#[derive(Clone, Default)]
pub struct Master {
sites: BTreeMap<String, (Option<()>, Addr<Counter>)>,
gc: u64,
}
impl MasterTrait for Master {}
impl Master {
pub fn add_site(&mut self, addr: Addr<Counter>, id: String) {
self.sites.insert(id, (None, addr));
}
pub fn new(gc: u64) -> Self {
Master {
sites: BTreeMap::new(),
gc,
}
}
pub fn get_site(&mut self, id: &str) -> Option<Addr<Counter>> {
let mut r = None;
if let Some((read_val, addr)) = self.sites.get_mut(id) {
r = Some(addr.clone());
*read_val = Some(());
};
r
}
pub fn rm_site(&mut self, id: &str) -> Option<(Option<()>, Addr<Counter>)> {
self.sites.remove(id)
}
pub fn rename(&mut self, msg: Rename) {
if let Some((_, counter)) = self.sites.remove(&msg.name) {
self.add_site(counter, msg.rename_to);
}
}
}
impl Actor for Master {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let addr = ctx.address();
let task = async move {
addr.send(CleanUp).await.unwrap();
}
.into_actor(self);
ctx.spawn(task);
}
}
impl Handler<AddVisitor> for Master {
type Result = MessageResult<AddVisitor>;
fn handle(&mut self, m: AddVisitor, ctx: &mut Self::Context) -> Self::Result {
let (tx, rx) = channel();
match self.get_site(&m.0) {
None => {
let _ = tx.send(Ok(None));
}
Some(addr) => {
let fut = async move {
match addr.send(super::counter::AddVisitor).await {
Ok(val) => {
let _ = tx.send(Ok(Some(val)));
}
Err(e) => {
let err: CaptchaError = e.into();
let _ = tx.send(Err(err));
}
}
}
.into_actor(self);
ctx.spawn(fut);
}
}
MessageResult(rx)
}
}
impl Handler<Rename> for Master {
type Result = MessageResult<Rename>;
fn handle(&mut self, m: Rename, _ctx: &mut Self::Context) -> Self::Result {
self.rename(m);
let (tx, rx) = channel();
let _ = tx.send(Ok(()));
MessageResult(rx)
}
}
#[derive(Message)]
#[rtype(result = "Option<Addr<Counter>>")]
pub struct GetSite(pub String);
impl Handler<GetSite> for Master {
type Result = MessageResult<GetSite>;
fn handle(&mut self, m: GetSite, _ctx: &mut Self::Context) -> Self::Result {
let addr = self.get_site(&m.0);
match addr {
None => MessageResult(None),
Some(addr) => MessageResult(Some(addr)),
}
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct CleanUp;
impl Handler<CleanUp> for Master {
type Result = ();
fn handle(&mut self, _: CleanUp, ctx: &mut Self::Context) -> Self::Result {
let sites = self.sites.clone();
let gc = self.gc;
let master = ctx.address();
info!("init master actor cleanup up");
let task = async move {
for (id, (new, addr)) in sites.iter() {
let visitor_count = addr.send(GetCurrentVisitorCount).await.unwrap();
println!("{}", visitor_count);
if visitor_count == 0 && new.is_some() {
addr.send(Stop).await.unwrap();
master.send(RemoveCaptcha(id.to_owned())).await.unwrap();
println!("cleaned up");
}
}
let duration = Duration::new(gc, 0);
sleep(duration).await;
master.send(CleanUp).await.unwrap();
}
.into_actor(self);
ctx.spawn(task);
}
}
impl Handler<RemoveCaptcha> for Master {
type Result = MessageResult<RemoveCaptcha>;
fn handle(&mut self, m: RemoveCaptcha, ctx: &mut Self::Context) -> Self::Result {
let (tx, rx) = channel();
if let Some((_, addr)) = self.rm_site(&m.0) {
let fut = async move {
let res: CaptchaResult<()> = addr.send(Stop).await.map_err(|e| e.into());
let _ = tx.send(res);
}
.into_actor(self);
ctx.spawn(fut);
} else {
tx.send(Ok(())).unwrap();
}
MessageResult(rx)
}
}
impl Handler<AddSite> for Master {
type Result = MessageResult<AddSite>;
fn handle(&mut self, m: AddSite, _ctx: &mut Self::Context) -> Self::Result {
let (tx, rx) = channel();
let counter: Counter = m.mcaptcha.into();
let addr = counter.start();
self.add_site(addr, m.id);
tx.send(Ok(())).unwrap();
MessageResult(rx)
}
}
impl Handler<GetInternalData> for Master {
type Result = MessageResult<GetInternalData>;
fn handle(&mut self, _m: GetInternalData, ctx: &mut Self::Context) -> Self::Result {
let (tx, rx) = channel();
let mut data = HashMap::with_capacity(self.sites.len());
let sites = self.sites.clone();
let fut = async move {
for (name, (_read_val, addr)) in sites.iter() {
println!("Trying to get data {name} 1");
match addr.send(super::counter::GetInternalData).await {
Ok(val) => {
println!("Trying to get data {name} 2");
data.insert(name.to_owned(), val);
}
Err(_e) => {
println!("Trying to get data {name}. Failed");
continue;
}
}
println!("Trying to get data {name} 4");
}
tx.send(Ok(data));
}
.into_actor(self);
ctx.spawn(fut);
println!("Trying to get data 3");
MessageResult(rx)
}
}
impl Handler<SetInternalData> for Master {
type Result = MessageResult<SetInternalData>;
fn handle(&mut self, mut m: SetInternalData, ctx: &mut Self::Context) -> Self::Result {
let (tx, rx) = channel();
for (name, mcaptcha) in m.mcaptcha.drain() {
let addr = self.get_site(&name);
let master = ctx.address();
let fut = async move {
match addr {
None => {
master.send(AddSite { id: name, mcaptcha }).await.unwrap();
}
Some(addr) => {
let _ = addr.send(super::counter::SetInternalData(mcaptcha)).await;
}
}
}
.into_actor(self);
ctx.spawn(fut);
}
let _ = tx.send(Ok(()));
MessageResult(rx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::master::embedded::counter::tests::*;
use crate::master::messages::AddSiteBuilder;
use crate::master::messages::RenameBuilder;
use crate::MCaptcha;
#[actix_rt::test]
async fn master_actor_works() {
let addr = Master::new(1).start();
let get_add_site_msg = |id: String, mcaptcha: MCaptcha| {
AddSiteBuilder::default()
.id(id)
.mcaptcha(mcaptcha)
.build()
.unwrap()
};
let id = "yo";
let msg = get_add_site_msg(id.into(), get_mcaptcha());
addr.send(msg).await.unwrap();
let mcaptcha_addr = addr.send(GetSite(id.into())).await.unwrap();
assert!(mcaptcha_addr.is_some());
let mut mcaptcha_data = addr
.send(GetInternalData)
.await
.unwrap()
.await
.unwrap()
.unwrap();
mcaptcha_data.get_mut(id).unwrap().add_visitor();
mcaptcha_data.get_mut(id).unwrap().add_visitor();
mcaptcha_data.get_mut(id).unwrap().add_visitor();
addr.send(SetInternalData {
mcaptcha: mcaptcha_data.clone(),
})
.await
.unwrap();
assert_eq!(
addr.send(GetInternalData)
.await
.unwrap()
.await
.unwrap()
.unwrap()
.get(id)
.unwrap()
.get_visitors(),
mcaptcha_data.get(id).unwrap().get_visitors()
);
let new_id = "yoyo";
let rename = RenameBuilder::default()
.name(id.into())
.rename_to(new_id.into())
.build()
.unwrap();
addr.send(rename).await.unwrap();
let mcaptcha_addr = addr.send(GetSite(new_id.into())).await.unwrap();
assert!(mcaptcha_addr.is_some());
let addr_doesnt_exist = addr.send(GetSite("a".into())).await.unwrap();
assert!(addr_doesnt_exist.is_none());
let timer_expire = Duration::new(DURATION, 0);
sleep(timer_expire).await;
sleep(timer_expire).await;
let mcaptcha_addr = addr.send(GetSite(new_id.into())).await.unwrap();
assert_eq!(mcaptcha_addr, None);
assert!(addr.send(RemoveCaptcha(new_id.into())).await.is_ok());
}
}