I/O 虚拟化(二):Virtio 协议代码走读

以 Dragonball virtiofs 为例

I/O 虚拟化(二):Virtio 协议代码走读

《I/O 虚拟化(一):Virtqueue 介绍》是从宏观视角解释了 virtqueue 的概念和工作原理,但是不看看代码总觉得不够踏实。这篇文章将会以 Dragonball 项目的 virtiofs 设备 为蓝本,介绍下通用的 virtio 协议在 device 侧(VMM 侧)的实现细节,尽量屏蔽 virtiofs 设备的细节。至于 driver 侧(内核侧)本质上就是 device 侧的反逻辑,如果后面有时间了再单独写一篇文章。

在 driver 准备好数据后,通过 irqfd[1] 通知 device 侧接收数据,在接收到中断信息之后,Dragonball 侧的逻辑主要由 VirtioFsEpollHandler::process() 函数实现。

VirtioFsEpollHandler 的 config 字段的 queues 字段,其类型是 Vec<VirtioQueueConfig<Q>>,其定义如下所示。QueueSync 就是对 Queue 类型的线程安全封装,它们的定义我都列了在下方。这里的 Queue 类型实际上是对 virtqueue 的一种描述,真正的数据需要根据这些描述信息去内存中获取。

关于 virtqueue 的定义在 rust-vmm 中有三个层次。最底层是 QueueT trait,它定义了 virtqueue 的基本行为。

pub trait QueueT: for<'a> QueueGuard<'a> {
    // === snipped ===
    fn desc_table(&self) -> u64;
    fn avail_ring(&self) -> u64;
    fn used_ring(&self) -> u64;
    // === snipped ===
}

中间层是两种 virtqueue 实现,分别是 QueueQueueSyncQueueSync 就是把 Queue 包在了 Arc<RwLock<>> 中,基础实现都在 Queue 中。

pub struct QueueSync {
    state: Arc<Mutex<Queue>>,
}

// Queue 的定义
pub struct Queue {
    /// The maximum size in elements offered by the device.
    max_size: u16,
    /// Tail position of the available ring.
    next_avail: Wrapping<u16>,
    /// Head position of the used ring.
    next_used: Wrapping<u16>,
    /// VIRTIO_F_RING_EVENT_IDX negotiated.
    event_idx_enabled: bool,
    /// The number of descriptor chains placed in the used ring via `add_used`
    /// since the last time `needs_notification` was called on the associated queue.
    num_added: Wrapping<u16>,
    /// The queue size in elements the driver selected.
    size: u16,
    /// Indicates if the queue is finished with configuration.
    ready: bool,
    /// Guest physical address of the descriptor table.
    desc_table: GuestAddress,
    /// Guest physical address of the available ring.
    avail_ring: GuestAddress,
    /// Guest physical address of the used ring.
    used_ring: GuestAddress,
}

从上面的定义中,desc_tableavail_ring 以及 used_ring 对应的前一章中的 virtq_descvirtq_avail 以及 virtq_used。由于是在 guest 中创建的,所以在 device 只定义了在 guest 地址空间的结构体地址。如果想要在 device 中使用,需要先将 guest 地址空间转换为 host 地址空间,然后 0..2addr2..4len……

Virtqueue 空间是由 driver 创建的,device 是如何知道确切的地址呢?答案是通过 MMIO,driver 向指定内存写入数据后,触发 VmExit,会调用 dragonball 的 MmioV2Device::write()

fn write(&self, _base: IoAddress, offset: IoAddress, data: &[u8]) {
    let offset = offset.raw_value();
    // Write to the device configuration area.
    if (MMIO_CFG_SPACE_OFF..DRAGONBALL_MMIO_DOORBELL_OFFSET).contains(&offset) {
        self.set_device_config(offset - MMIO_CFG_SPACE_OFF, data);
    } else if data.len() == 4 {
        let v = LittleEndian::read_u32(data);
        match offset {
            // === snipped ===
            // desc low 32 位地址
            REG_MMIO_QUEUE_DESC_LOW => {
                self.update_queue_field(|q| q.set_desc_table_address(Some(v), None))
            }
            // desc high 32 位地址
            REG_MMIO_QUEUE_DESC_HIGH => {
                self.update_queue_field(|q| q.set_desc_table_address(None, Some(v)))
            }
            REG_MMIO_QUEUE_AVAIL_LOW => {
                self.update_queue_field(|q| q.set_avail_ring_address(Some(v), None))
            }
            REG_MMIO_QUEUE_AVAIL_HIGH => {
                self.update_queue_field(|q| q.set_avail_ring_address(None, Some(v)))
            }
            REG_MMIO_QUEUE_USED_LOW => {
                self.update_queue_field(|q| q.set_used_ring_address(Some(v), None))
            }
            REG_MMIO_QUEUE_USED_HIGH => {
                self.update_queue_field(|q| q.set_used_ring_address(None, Some(v)))
            }
            // === snipped ===
        }
    }
    // === snipped ===
}

最上层是被 virtio 设备直接使用的,出了 queue 的定义外,还规定了 queue 周边内容,比如 eventfd、notifier 等,它是 VirtioQueueConfig

// VirtioQueueConfig 的定义
pub struct VirtioQueueConfig<Q: QueueT = QueueSync> {
    pub queue: Q,
    pub eventfd: Arc<EventFd>,
    notifier: Arc<dyn InterruptNotifier>,
    index: u16,
}

前面说过了 VirtioFsEpollHandler::process() 是处理来自 guest 的 events 的主要逻辑,该函数获取了 VirtioQueueConfig 数组的长度(名为 queues_len)。

fn process(&mut self, events: Events, _ops: &mut EventOps) {
    // snipped
    let slot = events.data();
    // snipped
    let queues_len = queues.len() as u32;
    // snipped
    match slot {
        // snipped: 处理其他 events
        // QUEUE_AVAIL_EVENT
        _ => {
            let idx = (slot - QUEUE_AVAIL_EVENT) as usize;
            if let Err(e) = queues[idx].consume_event() {
                error!("{}: failed to read queue event, {:?}", VIRTIO_FS_NAME, e);
                return;
            }
            drop(guard);

            if let Err(e) = self.process_queue(idx) {
                error!(
                    "{}: process_queue failed due to error {:?}",
                    VIRTIO_FS_NAME, e
                );
            }
        }
    }
}

Event 携带的 data(events.data())表明了本次通知的类型,在 virtiofs 中类型主要有以下几种:

  • Unknown(直接报错)
  • Rate limiter event
  • Patch rate limiter event
  • Queue avail event(在 virtiofs 下 slot 可以理解为 idx,假设有 2 个 queues,那么 slot == 0 就说明第一个 queue 有信息)

我们重点关照的是最后一个 queue avail event,这个事件表示某个 virtqueue 有数据等待被 VMM 处理。首先取走对应 queue 的 eventfd 事件,参见 queue[idx].consume_event(),但直接忽略了 eventfd 传来的值(为什么??那要这个 eventfd 的作用是什么?)。随后调用 process_queue() 处理 queue 中的数据,包含了与 avail vring 和 used vring 的交互。

为了便于理解,先来聊聊 rust-vmm 的 guest 内存管理方案,GuestAddressSpace trait 表示 guest 的地址空间,地址空间可以是一块或多块内存区域。Rust-vmm 为Arc<M> 实现了该接口,那么说明 Arc<M> 可以表示一个或多个 GuestMemory 实例,其中M 的类型是 GuestMemory 表示多块连续的内存区域(memory regions)。

pub trait GuestAddressSpace {
    type M: GuestMemory;
    type T: Clone + Deref<Target = Self::M>;
    fn memory(&self) -> Self::T;
}

impl<M: GuestMemory> GuestAddressSpace for Arc<M> {
    type M = M;
    type T = Self;

    fn memory(&self) -> Self {
        self.clone()
    }
}

接下来介绍 Deref 的作用,Rust 源码给出了以下示例。

use std::ops::Deref;

struct DerefExample<T> {
    value: T
}

impl<T> Deref for DerefExample<T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        &self.value
    }
}

let x = DerefExample { value: 'a' };
assert_eq!('a', *x);

当调用 *x 的时候,rust 会自动调用 Defer::defer() 返回引用(&DerefExample::T),然后再解引用,也就是 *x == *&DerefExample::T

GuestMemory 内含了多个区域(每个内存区域的类型是 GuestMemoryRegion),提供了几个方法提供获取内存区域相关的方法。

pub trait GuestMemory {
    type R: GuestMemoryRegion;
    // ...
    fn num_regions(&self) -> usize;
    fn find_region(&self, addr: GuestAddress) -> Option<&Self::R>;
    // ...
}

VirtioDeviceConfig 顾名思义是一个 virtio device 的配置,包含了 guest 地址空间(vm_as)和 queues 配置(queues)。VirtioDeviceConfig 提供了一个 lock_guest_memory(),它的功能是调用 GuestAddressSpace::memory() 返回一个实现了 GuestMemory trait 类型的 guest 内存的引用。

pub struct VirtioDeviceConfig<
    AS: GuestAddressSpace,
    Q: QueueT = QueueSync,
    R: GuestMemoryRegion = GuestRegionMmap,
> {
    pub vm_as: AS,
    // ...
    pub queues: Vec<VirtioQueueConfig<Q>>,
    // ...
}

impl<AS, Q, R> VirtioDeviceConfig<AS, Q, R>
where
    AS: GuestAddressSpace,
    Q: QueueT,
    R: GuestMemoryRegion,
{
    // ...
    pub fn lock_guest_memory(&self) -> AS::T {
        self.vm_as.memory()
    }
}

Rust-vmm 的内存处理的最重要的几个 traits、结构体已经搞清楚了,现在我们可以开始关注 process_queue() 到底做了什么了:访问 avail queue 以及 desc ring 获取等待处理的 buffer,解析请求后发送给 virtiofsd(virtiofs 在 host 上的 server),将处理好的数据写入 buffer 后更新 used ring,最后通知 driver。这个方法每次只处理一个 queue,因此传入参数需要指定 queue 的索引(queue_index)。

fn process_queue(&mut self, queue_index: usize) -> Result<()> {
    // (1)
    let mut config_guard = self.config.lock().unwrap();
    let mem = config_guard.lock_guest_memory();
    let vm_as = config_guard.vm_as.clone();
    // (2)
    let queue = &mut config_guard.queues[queue_index];
    // ...
    // (3)
    let mut queue_guard = queue.queue_mut().lock();
    let mut iter = queue_guard
        .iter(mem.clone())
        .map_err(Error::VirtioQueueError)?;

    // (4)
    for desc_chain in &mut iter {
        //...
        let work_func = move || {
            let guard = vm_as.memory();
            let mem = guard.deref();
            // (5)
            let reader = Reader::from_descriptor_chain(mem, desc_chain.clone())
                .map_err(FsError::InvalidDescriptorChain)
                .unwrap();
            // (6)
            let writer = Writer::VirtioFs(
                VirtioFsWriter::new(mem, desc_chain)
                    .map_err(FsError::InvalidDescriptorChain)
                    .unwrap(),
            );
            // (7)
            let total = server
                .handle_message(
                    reader,
                    writer,
                    cache_handler
                        .as_mut()
                        .map(|x| x as &mut dyn FsCacheReqHandler),
                    None,
                )
                .map_err(FsError::ProcessQueue)
                .unwrap();

            if pooled {
                // (8)
                let queue = &mut config.lock().unwrap().queues[queue_index];
                queue.add_used(mem, head_index, total as u32);
                if let Err(e) = queue.notify() {
                    error!("failed to signal used queue: {:?}", e);
                }
            } else {
                // ...
            }
        };
        // ...
    }
    // ...
    let notify = !self.is_multi_thread() && used_count > 0;
    // unlock QueueT
    drop(queue_guard);
    // ...
    // (9)
    if notify {
        if let Err(e) = queue.notify() {
            error!("failed to signal used queue: {:?}", e);
        }
    }

    Ok(())
}

(1) 是从 VirtioDeviceConfig 中获取 guest 内存 mem,这部分内容在前面介绍过了,这里不再赘述。

(2) 是根据 queue_indexVirtioDeviceConfig 获取一个指定的 queue。

(3) 是读取 avail ring 的 idx 字段并生成了一个 virtq_avail.ring(详见 Avail ring)的迭代器(类型是 AvailIter)。创建迭代器的实际实现的方法是 Queue::iter(),分为两个子步骤:获取 avail ring 的 idx,创建迭代器。

impl QueueOwnedT for Queue {
    fn iter<M>(&mut self, mem: M) -> Result<AvailIter<'_, M>, Error>
    where
        M: Deref,
        M::Target: GuestMemory,
    {
        if !self.ready || self.avail_ring == GuestAddress(0) {
            return Err(Error::QueueNotReady);
        }

        // (3.1)
        self.avail_idx(mem.deref(), Ordering::Acquire)
            // (3.2)
            .map(move |idx| AvailIter::new(mem, idx, self))?
    }
    // ...
}

(3.1) avail_idx() 的作用是获取 idx 的值。Avail ring 存储了 queue 的 flagsle16)和 idxle16),queue 是一个环形队列,每个 entry 保存了 descriptor 的 index(le16)。

self.avail_ring 是 avail ring 的起始地址,checked_add(2) 跳过了 flags 字段(类型是 le16),从内存中加载一个类型是 le16idx,它表示 ring 的头。

impl QueueT for Queue {
    // ...
    fn avail_idx<M>(&self, mem: &M, order: Ordering) -> Result<Wrapping<u16>, Error>
    where
        M: GuestMemory + ?Sized,
    {
        // idx 的起始地址
        let addr = self
            .avail_ring
            .checked_add(2)
            .ok_or(Error::AddressOverflow)?;

        // 从 mem 中加载 idx 的值
        mem.load(addr, order)
            .map(u16::from_le)
            .map(Wrapping)
            .map_err(Error::GuestMemory)
    }
    // ...
}

(3.2) AvailIter::new() 是负责创建 AvailIter 实例的方法,它填充了全部字段:

  • desc_table: desc ring 的起始地址。
  • avail_ring: avail ring 的起始地址。
  • queue_size: queue 的长度。
  • last_index: avail ring 的头,头是针对 driver 来说的,表示 driver 即将到来数据被写入的位置,而对于 device 来说是尾)。
  • next_avail: avail ring 的尾,与 last_index 一样,表示 device 下一次读取的位置。
#[derive(Debug)]
pub struct AvailIter<'b, M> {
    mem: M,
    desc_table: GuestAddress,
    avail_ring: GuestAddress,
    queue_size: u16,
    last_index: Wrapping<u16>,
    next_avail: &'b mut Wrapping<u16>,
}

impl<'b, M> AvailIter<'b, M>
where
    M: Deref,
    M::Target: GuestMemory,
{
    // ...
    pub(crate) fn new(mem: M, idx: Wrapping<u16>, queue: &'b mut Queue) -> Result<Self, Error> {
        if (idx - queue.next_avail).0 > queue.size {
            return Err(Error::InvalidAvailRingIndex);
        }

        Ok(AvailIter {
            mem,
            desc_table: queue.desc_table,
            avail_ring: queue.avail_ring,
            queue_size: queue.size,
            last_index: idx,
            next_avail: &mut queue.next_avail,
        })
    }
    // ...
}

(4) 调用 AvailIter::next() 遍历 next_avail..last_index 之间的 entries,每次遍历会访问 desc ring 获取 descriptor(类型是 DescriptorChain)。

下面这段代码说明了是如何获得 DescriptorChain 这个对象的,上图展示了 avail ring 的布局。

对两个常量的解释:

  • VIRTQ_AVAIL_ELEMENT_SIZE: 值是 2,表示一个 entry(le16)的长度。
  • VIRTQ_AVAIL_RING_HEADER_SIZE: 值是 4,flagsle16)和 idxle16)的长度,表示 ring 的 header 的长度。

这个方法的流程是:

  • 计算 next_avail 对应 entry 的 offset。
  • 从内存中读取出 descriptor index 的值。
  • 创建 DescriptorChain 并返回。
fn next(&mut self) -> Option<Self::Item> {
    // 遍历退出条件:队列中无等待读取的数据
    if *self.next_avail == self.last_index {
        return None;
    }

    let elem_off =
        u64::from(self.next_avail.0.checked_rem(self.queue_size)?) * VIRTQ_AVAIL_ELEMENT_SIZE;
    let offset = VIRTQ_AVAIL_RING_HEADER_SIZE + elem_off;

    // 取出 index
    let addr = self.avail_ring.checked_add(offset)?;
    let head_index: u16 = self
        .mem
        .load(addr, Ordering::Acquire)
        .map(u16::from_le)
        .map_err(|_| error!("Failed to read from memory {:x}", addr.raw_value()))
        .ok()?;

    // next_avail +1
    *self.next_avail += Wrapping(1);

    // 仅返回 desc table item 的描述,并没有实际访问内存
    Some(DescriptorChain::new(
        self.mem.clone(),
        self.desc_table,
        self.queue_size,
        head_index,
    ))
}

一顿操作猛如虎,已经从 avail ring 中拿到了 descriptor(也就是 DescriptorChain),接下来就是读取 buffer 加私有业务逻辑了。比如我们看的是 virtiofs,那么就是与 fs 相关的业务逻辑,如果我们看 viritio-net,那就是与网络相关的业务逻辑。注意:这篇文章的目的是介绍通用的 virtio 流程,接下来重点是介绍从 DescriptorChain 中读取 buffer,尽量避免或者少讲 virtiofs 私有逻辑。

(5)调用 Reader::from_descriptor_chain() 读取 readable buffers,即没有 VRING_DESC_F_WRITE flag 的 descriptors。

由于遍历 descriptor chain 的操作是顺序的,先读 readable buffer 再读 writable buffer 说明 writable buffer 是置于 readable buffer 之后的。

from_descriptor_chain() 的代码如下所示。

pub fn from_descriptor_chain<M>(
    mem: &'a M::Target,
    desc_chain: DescriptorChain<M>,
) -> Result<Reader<'a, MS<'a, M::Target>>>
where
    M: Deref,
    M::Target: GuestMemory + Sized,
{
    let mut total_len: usize = 0;
    // (5.1)
    let mut buffers = VecDeque::with_capacity(64);
    // (5.2)
    for desc in desc_chain.readable() {
        // ...
        // (5.3)
        let region = mem
            .find_region(desc.addr())
            .ok_or(Error::FindMemoryRegion)?;
        let offset = desc
            .addr()
            .checked_sub(region.start_addr().raw_value())
            .unwrap();

        // (5.4)
        buffers.push_back(
            region
                .get_slice(MemoryRegionAddress(offset.raw_value()), desc.len() as usize)
                .map_err(Error::GuestMemoryError)?,
        );
    }

    Ok(Reader {
        buffers: IoBuffers {
            buffers,
            bytes_consumed: 0,
        },
    })
}

(5.1) buffers 默认生成了一个长度是 64 的数组,每个元素的类型是 VecDeque<vm_memory::VolatileSlice<'_, <<<M as Deref>::Target as GuestMemory>::R as GuestMemoryRegion>::B>>,简单地理解为 buffer 的指针就行。当然这个是可以扩容的,但是频繁扩容会有性能影响。

(5.2) 调用了 DiscriptorChain::readable() 方法生成了一个 DescriptorChainRwIter 实例,其 writable 字段被设置为 false

pub fn readable(self) -> DescriptorChainRwIter<M> {
    DescriptorChainRwIter {
        chain: self,
        writable: false,
    }
}

每一次遍历都调用的是 DescriptorChainRwIter::next(),在 DescriptorChain::next() 的基础上增加了读写权限的判断。

// DescriptorChainRwIter::next()
fn next(&mut self) -> Option<Self::Item> {
    loop {
        match self.chain.next() {
            Some(v) => {
                if v.is_write_only() == self.writable {
                    return Some(v);
                }
            }
            None => return None,
        }
    }
}

// DescriptorChain::next()
fn next(&mut self) -> Option<Self::Item> {
    if self.ttl == 0 || self.next_index >= self.queue_size {
        return None;
    }

    // (5.2.1)
    let desc_addr = self
        .desc_table
        .checked_add(self.next_index as u64 * size_of::<Descriptor>() as u64)?;

    // (5.2.2)
    let desc = self.mem.read_obj::<Descriptor>(desc_addr).ok()?;
    // ...
    if desc.has_next() {
        // (5.2.3)
        self.next_index = desc.next();
        self.ttl -= 1;
    } else {
        self.ttl = 0;
    }

    Some(desc)
}

(5.2.1) self.next_index 初始值是从 avail queue 获取的 descriptor index,之后都是读取的 virtq_desc.nextdesc_addr 是 descriptor 的起始地址。

(5.2.2) 是根据起始地址从内存中读出 Descriptor

(5.2.3) 如果 chain 没有读完,即 has_next() 返回 true,那么就设置 next_index 的值为下一个 descriptor index。

(5.3) 根据当前的 descriptor 计算出 buffer 所在的内存区域(region)和区域内偏移(offset)。

(5.4) 将最终 buffer 转化为一个字节的 slice 并 push 到 buffers 中。

(6) 与 (5) 非常相似,读取 desc ring 的 writable buffers,这里就不再赘述了。

(7) handle_message() 是 virtiofs 的私有逻辑了,这里不同的设备(virtio-net、virio-mem 等)处理的逻辑都不尽相同。Virtiofs 支持的操作包括 Init、Write、Open 等等操作磁盘 I/O 的逻辑。总之就是从 Reader 中读取从 device 传来的数据,然后将结果写入到 Writer 的 buffer 中通过 virtqueue 回传给 device。

(8) 将 descriptor index 添加到 used_queue 中,used queue 的 ring 数据类型是 virtq_used_elem,除了 index 以外还有一个字段表示长度,表示本次使用的 buffers 的总长度。

(9) 设置 legacy irq 通知 driver,queue 有新的数据到来。

References

  1. https://zhuanlan.zhihu.com/p/547777878