https://users.rust-lang.org/t/execute-future-in-background-using-tokio-runtime/97780/2
↑を読んだらうまく書けそう(ebpf-afw-rust の話)。





以下、浩さんとのやりとりのメモ

リクルートに登録。Wantedly は技術によりすぎているかも。
全体的な新卒向けのサービスを使ってみたほうがいい。一般的な方面のところで、狭い部分に入り込む。


  • 業種
  • 先輩の話を聞いてみる





(就職)?四季報 を買った。
英語を活かしたい。
メーカー関連のインターンに行ってみる、など。インターンでなくても、会社の説明会でもいいかもしれない。
日本信号





use aya::maps::{MapData, RingBuf};  
use ebpf\_afw\_common::UnknownPacket; 
 
 
use std::os::fd::{AsRawFd, RawFd}; 
use std::time::Duration; 
use anyhow::Error; 
use tokio::io::unix::AsyncFd; 
use tokio::sync::mpsc::Sender; 
use tokio::time::timeout; 
 
pub struct UnknownPacketsRingbuf { 
    inner: RingBuf<MapData>, 
    async\_fd: AsyncFd<RawFd>, 

 
impl UnknownPacketsRingbuf { 
    pub fn new(inner: RingBuf<MapData>) -> Self { 
        let raw\_fd = inner.as\_raw\_fd(); 
 
        Self { 
            inner, 
            async\_fd: AsyncFd::new(raw\_fd).unwrap(), 
        } 
    } 
 
    pub async fn send\_packets(&mut self, 
                              mut up\_tx: Sender<UnknownPacket>, 
    ) -> Result<(), Error> { 
        if let Ok(Ok(mut ready\_guard)) = 
            timeout(Duration::from\_millis(100), self.async\_fd.readable()).await { 
            ready\_guard.clear\_ready(); 
 
            while let Some(data) = self.inner.next() { 
                let value = &\*data; 
                let up = UnknownPacket::try\_from(value).unwrap(); 
                up\_tx.send(up).await?; 
            } 
            Ok(()) 
        } else { Ok(()) } 
    } 
 
    /// 外から closure を受け取って、パケットが到達する度にその closure を呼び出すような Future を作成し、返します。 
    pub async fn register\_handler<T>(mut self, mut closure: T) 
        where 
            T: FnMut(UnknownPacket), 
    { 
        let async\_fd = AsyncFd::new(self.inner.as\_raw\_fd()).unwrap(); 
        loop { 
            let mut guard = async\_fd.readable().await.unwrap(); 
            while let Some(read) = (&mut self.inner).next() { 
                let value = &\*read; 
                let up = UnknownPacket::try\_from(value).unwrap(); 
                closure(up); 
            } 
            guard.clear\_ready(); 
        } 
    } 
}


use crate::ipv4\_rule\_map::{Ipv4RuleHashMap, Ipv4RuleMap};  
use anyhow::{bail, Context, Error}; 
use aya::maps::{HashMap, MapError, RingBuf}; 
use aya::programs::{SchedClassifier, tc, TcAttachType}; 
use aya::{include\_bytes\_aligned, Bpf}; 
use aya\_log::BpfLogger; 
use tokio::io::unix::AsyncFd; 
use tokio::sync::mpsc::error::TryRecvError; 
use tokio::sync::mpsc::{Receiver, Sender}; 
use tokio::task::{JoinHandle, yield\_now}; 
use ebpf\_afw\_common::{Ipv4FiveTuple, PacketAction, UnknownPacket}; 
use crate::classifier::ClassifierThreadAction::Stop; 
use crate::unknown\_packets\_ringbuf::UnknownPacketsRingbuf; 
 
/// rule map に基づいたパケットフィルタリングを実施する主体 
/// インスタンスが作成されると、特定の network interface にアタッチされて動作を開始する 
pub struct Classifier { 
    bpf: Bpf, 

 
impl Classifier { 
    /// 与えられた network interface に対するフィルタリングを開始し、Classifier を返します。 
    pub fn try\_new(interface: &str) -> Result<(Self, UnknownPacketsRingbuf, Ipv4RuleMap), Error> { 
        // Bump the memlock rlimit. This is needed for older kernels that don't use the 
        // new memcg based accounting, see https://lwn.net/Articles/837122/ 
        let rlim = libc::rlimit { 
            rlim\_cur: libc::RLIM\_INFINITY, 
            rlim\_max: libc::RLIM\_INFINITY, 
        }; 
        let ret = unsafe { libc::setrlimit(libc::RLIMIT\_MEMLOCK, &rlim) }; 
        if ret != 0 { 
            bail!("remove limit on locked memory failed, ret is: {}", ret) 
        } 
 
        #\[cfg(debug\_assertions)\] 
        let mut bpf = Bpf::load(include\_bytes\_aligned!( 
            "../../target/bpfel-unknown-none/debug/ebpf-afw" 
        ))?; 
        #\[cfg(not(debug\_assertions))\] 
        let mut bpf = Bpf::load(include\_bytes\_aligned!( 
            "../../target/bpfel-unknown-none/release/ebpf-afw" 
        ))?; 
        BpfLogger::init(&mut bpf)?; 
 
        // error adding clsact to the interface if it is already added is harmless 
        // the full cleanup can be done with 'sudo tc qdisc del dev eth0 clsact'. 
        let \_ = tc::qdisc\_add\_clsact(interface); 
        let program: &mut SchedClassifier = bpf.program\_mut("ebpf\_afw").unwrap().try\_into()?; 
        program.load()?; 
 
        program.attach(interface, TcAttachType::Egress)?; 
        program.attach(interface, TcAttachType::Ingress)?; 
 
        // eBPF プログラムが読むルールにあてはまらなかったパケットのリングバッファ 
        let unknown\_packets = RingBuf::try\_from( 
            bpf.take\_map("UNKNOWN\_PACKETS") 
                .context("Failed to take ringbuf map")?, 
        )?; 
 
        // eBPF プログラムが読むルールの一覧 
        let v4\_rule\_map: Ipv4RuleHashMap = HashMap::try\_from( 
            bpf.take\_map("V4\_RULE\_MAP") 
                .context("Failed to take hashmap map")?, 
        )?; 
 
        Ok((Self { bpf }, UnknownPacketsRingbuf::new(unknown\_packets), Ipv4RuleMap::new(v4\_rule\_map))) 
    } 

 
#\[derive(Eq, PartialEq)\] 
pub enum ClassifierThreadAction { 
    Stop, 
    AddRule(Ipv4FiveTuple, PacketAction), 
    RemoveRule(Ipv4FiveTuple), 

 
pub async fn classifier\_thread( 
    interface: String, 
    mut cta\_rx: Receiver<ClassifierThreadAction>, 
    mut up\_tx: Sender<UnknownPacket>, 
) -> Result<(), Error> { 
    let (classifier , mut ringbuf, mut v4\_rule\_map) = Classifier::try\_new(interface.as\_str())?; 
 
    loop { 
        let result = cta\_rx.try\_recv(); 
        if let Err(e) = result { 
            match e { 
                TryRecvError::Empty => println!("empty"), 
                TryRecvError::Disconnected => bail!("disconnected, bail"), 
            } 
        } else { 
            let action = result.unwrap(); 
            if action == Stop { 
                break Ok(()); 
            } else { 
                process\_classifier\_thread\_action(&mut v4\_rule\_map, action)?; 
            } 
        } 
        println!("classifier"); 
 
        ringbuf.send\_packets(up\_tx.clone()).await?; 
        // yield\_now().await; 
    } 

 
fn process\_classifier\_thread\_action( 
    v4\_rule\_map: &mut Ipv4RuleMap, 
    classifier\_thread\_action: ClassifierThreadAction, 
) -> Result<(), MapError> { 
    match classifier\_thread\_action { 
        ClassifierThreadAction::Stop => Ok(()), 
        ClassifierThreadAction::AddRule(tuple, packet\_action) => { 
            v4\_rule\_map.insert(&tuple, packet\_action) 
        }, 
        ClassifierThreadAction::RemoveRule(tuple) => { 
            v4\_rule\_map.remove(&tuple) 
        } 
    } 
}


#!\[feature(const\_try)\]  
 
use log::{error, info, warn}; 
use network\_types::eth::EthHdr; 
use network\_types::ip::{IpProto, Ipv4Hdr}; 
 
use ebpf\_afw\_core::ipv4addr::Ipv4Addr; 
 
use ebpf\_afw\_common::ipv4\_five\_tuple::Ipv4FiveTuple; 
use ebpf\_afw\_common::packet\_action::PacketAction; 
use ebpf\_afw\_common::port::Port; 
use ebpf\_afw\_common::unknown\_packet::UnknownPacket; 
use ebpf\_afw\_core::classifier::{Classifier, classifier\_thread, ClassifierThreadAction}; 
use ebpf\_afw\_core::ipv4\_rule\_map::Ipv4RuleMap; 
use std::ptr::read\_unaligned; 
use std::sync::atomic::{AtomicBool, Ordering}; 
use std::sync::{Arc}; 
use tokio::{join, signal, task}; 
use tokio::sync::mpsc; 
use tokio::sync::mpsc::error::TryRecvError; 
use tokio::task::{LocalSet, spawn\_local, yield\_now}; 
 
#\[tokio::main\] 
async fn main() -> Result<(), anyhow::Error> { 
    let s = LocalSet::new(); 
    s.run\_until(inner\_main()).await 

 
async fn inner\_main() -> Result<(), anyhow::Error> { 
    env\_logger::init(); 
 
    let (\_, mut unknown\_packets\_ringbuf, mut v4\_rule\_map) = Classifier::try\_new("enp0s31f6")?; 
    if let Err(e) = v4\_rule\_map.load\_saved() { 
        warn!("Couldn't load v4 rule map from file: {}", e); 
    } 
 
    // 192.168.1.3 から 1.1.1.1 への ICMP Echo Request を送らないためのルールを試験的に追加 
    let example\_rule = &Ipv4FiveTuple::new( 
        Ipv4Addr::new(192, 168, 1, 3).into(), 
        Ipv4Addr::new(1, 1, 1, 1).into(), 
        Port::ANY, 
        IpProto::Icmp, 
    ); 
 
    v4\_rule\_map.insert(example\_rule, PacketAction::Deny)?; 
 
    let local\_addr = Ipv4Addr::new(192, 168, 1, 3); 
    allow\_outgoing\_dns( 
        &mut v4\_rule\_map, 
        local\_addr, 
        Ipv4Addr::new(1, 1, 1, 1), 
    )?; 
    allow\_outgoing\_dns( 
        &mut v4\_rule\_map, 
        local\_addr, 
        Ipv4Addr::new(1, 0, 0, 1), 
    )?; 
    allow\_outgoing\_dns( 
        &mut v4\_rule\_map, 
        local\_addr, 
        Ipv4Addr::new(8, 8, 8, 8), 
    )?; 
    allow\_outgoing\_dns( 
        &mut v4\_rule\_map, 
        local\_addr, 
        Ipv4Addr::new(8, 8, 4, 4), 
    )?; 
    allow\_outgoing\_https( 
        &mut v4\_rule\_map, 
        local\_addr, 
        Ipv4Addr::new(1, 1, 1, 1), 
    )?; 
 
    let close = Arc::new(AtomicBool::new(false)); 
 
    let (cta\_tx, cta\_rx) = mpsc::channel(100); 
    let (up\_tx, mut up\_rx) = mpsc::channel(100); 
 
    // setup signal handler 
    let close\_me = close.clone(); 
    // let signal\_handler = spawn\_local(async move { 
    //     info!("Waiting for Ctrl-C..."); 
    //     signal::ctrl\_c().await.unwrap(); 
    //     info!("Exiting..."); 
    //     close\_me.store(true, Ordering::Relaxed); 
    //     cta\_tx.send(ClassifierThreadAction::Stop).await.expect("An error occured while trying to stop Classifier"); 
    // }); 
 
    let classifier\_handler = spawn\_local( 
        classifier\_thread("enp0s31f6".to\_string(), cta\_rx, up\_tx) 
    ); 
 
    let packet\_handler = spawn\_local(async move { 
        loop { 
            if close.load(Ordering::Relaxed) { 
                let v4\_rule\_map\_filesize = v4\_rule\_map.save\_into\_file().unwrap(); 
                info!("V4\_RULE\_MAP saved: {} bytes", &v4\_rule\_map\_filesize); 
                return; 
            } 
 
            let recv = up\_rx.try\_recv(); 
            match recv { 
                Err(TryRecvError::Empty) => {}, 
                Err(TryRecvError::Disconnected) => println!("disconnected"), 
                Ok(up) => process\_packet(up, &mut v4\_rule\_map), 
            } 
            yield\_now().await; 
        } 
    }); 
 
    info!("Waiting for Ctrl-C..."); 
    signal::ctrl\_c().await.unwrap(); 
    info!("Exiting..."); 
    close\_me.store(true, Ordering::Relaxed); 
    cta\_tx.send(ClassifierThreadAction::Stop).await.expect("An error occured while trying to stop Classifier"); 
 
    Ok(()) 

 
/// BPF に格納されたルールに適合しなかったパケットに関する処理を実施します。 
/// 今は試験的に、そのようなパケットを許可するルールを追加します。 
fn process\_packet(up: UnknownPacket, v4\_rule\_map: &mut Ipv4RuleMap) { 
    if up.len < (Ipv4Hdr::LEN + EthHdr::LEN) { 
        return; 
    } 
 
    let (\_, rest) = up.data.split\_at(EthHdr::LEN); 
    let (ipv4hdr, \_) = rest.split\_at(Ipv4Hdr::LEN); 
 
    let buf = (ipv4hdr.as\_ptr()) as \*const Ipv4Hdr; 
    let ipv4hdr = unsafe { read\_unaligned(buf) }; 
    info!( 
        "packet from {:#} to {:#}", 
        Ipv4Addr::from(ipv4hdr.src\_addr.to\_be()), 
        Ipv4Addr::from(ipv4hdr.dst\_addr.to\_be()) 
    ); 
 
    let new\_key: Ipv4FiveTuple = (&up).try\_into().unwrap(); 
    let new\_value = PacketAction::Allow; 
 
    if let Err(error) = v4\_rule\_map.insert(&new\_key, new\_value) { 
        error!( 
            "An error happened while adding rule to BPF hashmap: {}", 
            error 
        ); 
    } else { 
        info!("Added rule: {:#?}", new\_key); 
    } 

 
fn allow\_outgoing\_dns( 
    v4\_rule\_map: &mut Ipv4RuleMap, 
    local\_addr: Ipv4Addr, 
    remote\_addr: Ipv4Addr, 
) -> Result<(), anyhow::Error> { 
    let egress\_rule = &Ipv4FiveTuple::new( 
        local\_addr.into(), 
        remote\_addr.into(), 
        Port::ANY, 
        IpProto::Udp, 
    ); 
    let ingress\_rule = &Ipv4FiveTuple::new( 
        remote\_addr.into(), 
        local\_addr.into(), 
        Port::ANY, 
        IpProto::Udp, 
    ); 
    v4\_rule\_map.insert(egress\_rule, PacketAction::Allow)?; 
    v4\_rule\_map.insert(ingress\_rule, PacketAction::Allow)?; 
    Ok(()) 

 
fn allow\_outgoing\_https( 
    v4\_rule\_map: &mut Ipv4RuleMap, 
    local\_addr: Ipv4Addr, 
    remote\_addr: Ipv4Addr, 
) -> Result<(), anyhow::Error> { 
    let egress\_rule = &Ipv4FiveTuple::new( 
        local\_addr.into(), 
        remote\_addr.into(), 
        Port { 
            dst\_port: 443, 
            ..Port::ANY 
        }, 
        IpProto::Tcp, 
    ); 
    let ingress\_rule = &Ipv4FiveTuple::new( 
        remote\_addr.into(), 
        local\_addr.into(), 
        Port { 
            src\_port: 443, 
            ..Port::ANY 
        }, 
        IpProto::Tcp, 
    ); 
    v4\_rule\_map.insert(egress\_rule, PacketAction::Allow)?; 
    v4\_rule\_map.insert(ingress\_rule, PacketAction::Allow)?; 
    Ok(()) 
}