use crate::client_factory::ClientFactoryAsync;
use crate::event::reader_group_state::ReaderGroupStateError::SyncError;
use crate::event::reader_group_state::{Offset, ReaderGroupStateError};
use crate::segment::reader::ReaderError::SegmentSealed;
use crate::segment::reader::{AsyncSegmentReader, ReaderError};
use snafu::{ResultExt, Snafu};
use pravega_client_retry::retry_result::Retryable;
use pravega_client_shared::{Reader, ScopedSegment, Segment, SegmentWithRange};
use pravega_wire_protocol::commands::{Command, EventCommand, TYPE_PLUS_LENGTH_SIZE};
use crate::sync::synchronizer::SynchronizerError;
use bytes::{Buf, BufMut, BytesMut};
use core::fmt;
use im::HashMap as ImHashMap;
use std::collections::{HashMap, HashSet};
use std::mem;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::runtime::Handle;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::{mpsc, Mutex};
use tokio::time::timeout;
use tracing::{debug, error, info, warn};
type ReaderErrorWithOffset = (ReaderError, i64);
type SegmentReadResult = Result<SegmentDataBuffer, ReaderErrorWithOffset>;
const REBALANCE_INTERVAL: Duration = Duration::from_secs(10);
const UPDATE_OFFSET_INTERVAL: Duration = Duration::from_secs(3);
const READ_BUFFER_SIZE: i32 = 8 * 1024 * 1024; cfg_if::cfg_if! {
if #[cfg(test)] {
use crate::event::reader_group_state::MockReaderGroupState as ReaderGroupState;
} else {
use crate::event::reader_group_state::ReaderGroupState;
}
}
pub struct EventReader {
pub id: Reader,
factory: ClientFactoryAsync,
rx: Receiver<SegmentReadResult>,
tx: Sender<SegmentReadResult>,
meta: ReaderState,
rg_state: Arc<Mutex<ReaderGroupState>>,
}
#[derive(Debug, Snafu)]
pub enum EventReaderError {
#[snafu(display("ReaderGroup State error: {}", source))]
StateError { source: ReaderGroupStateError },
}
impl Drop for EventReader {
fn drop(&mut self) {
info!("Reader {:?} is dropped", self.id);
let r = Handle::try_current();
let rg_state = self.rg_state.clone();
let id = self.id.clone();
let mut meta = mem::take(&mut self.meta);
match r {
Ok(handle) => {
let _ = handle.enter();
tokio::spawn(async move {
EventReader::reader_offline_internal(id, rg_state, &mut meta)
.await
.expect("Reader Offline");
});
info!("Reader {:?} is marked as offline.", self.id);
}
Err(_) => {
let rt = tokio::runtime::Runtime::new().expect("Create tokio runtime to drop reader");
rt.spawn(async move {
EventReader::reader_offline_internal(id, rg_state, &mut meta)
.await
.expect("Reader Offline");
});
info!("Reader {:?} is marked as offline.", self.id);
}
}
}
}
impl EventReader {
pub(crate) async fn init_reader(
id: String,
rg_state: Arc<Mutex<ReaderGroupState>>,
factory: ClientFactoryAsync,
) -> Self {
let reader = Reader::from(id);
let new_segments_to_acquire = rg_state
.lock()
.await
.compute_segments_to_acquire_or_release(&reader)
.await
.expect("should compute segments");
if new_segments_to_acquire > 0 {
for _ in 0..new_segments_to_acquire {
if let Some(seg) = rg_state
.lock()
.await
.assign_segment_to_reader(&reader)
.await
.expect("Error while waiting for segments to be assigned")
{
debug!("Acquiring segment {:?} for reader {:?}", seg, reader);
} else {
debug!(
"No unassigned segments that can be acquired by the reader {:?}",
reader
);
break;
}
}
}
let mut assigned_segments = rg_state
.lock()
.await
.get_segments_for_reader(&reader)
.await
.expect("Error while fetching currently assigned segments");
let mut slice_meta_map: HashMap<ScopedSegment, SliceMetadata> = HashMap::new();
slice_meta_map.extend(assigned_segments.drain().map(|(seg, offset)| {
(
seg.clone(),
SliceMetadata {
scoped_segment: seg.to_string(),
start_offset: offset.read,
read_offset: offset.read,
..Default::default()
},
)
}));
let (tx, rx) = mpsc::channel(1);
let mut stop_reading_map: HashMap<ScopedSegment, oneshot::Sender<()>> = HashMap::new();
slice_meta_map.iter().for_each(|(segment, meta)| {
let (tx_stop, rx_stop) = oneshot::channel();
stop_reading_map.insert(segment.clone(), tx_stop);
factory.runtime_handle().spawn(SegmentSlice::get_segment_data(
segment.clone(),
meta.start_offset,
tx.clone(),
rx_stop,
factory.clone(),
));
});
EventReader::init_event_reader(
rg_state,
reader,
factory,
tx,
rx,
slice_meta_map,
stop_reading_map,
)
}
#[doc(hidden)]
fn init_event_reader(
rg_state: Arc<Mutex<ReaderGroupState>>,
id: Reader,
factory: ClientFactoryAsync,
tx: Sender<SegmentReadResult>,
rx: Receiver<SegmentReadResult>,
segment_slice_map: HashMap<ScopedSegment, SliceMetadata>,
slice_stop_reading: HashMap<ScopedSegment, oneshot::Sender<()>>,
) -> Self {
EventReader {
id,
factory,
rx,
tx,
meta: ReaderState {
slices: segment_slice_map,
slices_dished_out: Default::default(),
slice_release_receiver: HashMap::new(),
slice_stop_reading,
last_segment_release: Instant::now(),
last_segment_acquire: Instant::now(),
last_offset_update: Instant::now(),
reader_offline: false,
},
rg_state,
}
}
#[doc(hidden)]
#[cfg(feature = "integration-test")]
pub fn set_last_acquire_release_time(&mut self, time: Instant) {
self.meta.last_segment_release = time;
self.meta.last_segment_acquire = time;
}
pub async fn release_segment(&mut self, mut slice: SegmentSlice) -> Result<(), EventReaderError> {
info!(
"releasing segment slice {} from reader {:?}",
slice.meta.scoped_segment, self.id
);
if self.meta.reader_offline {
return Err(EventReaderError::StateError {
source: ReaderGroupStateError::ReaderAlreadyOfflineError {
error_msg: format!("Reader already marked offline {:?}", self.id),
source: SynchronizerError::SyncPreconditionError {
error_msg: String::from("Precondition failure"),
},
},
});
}
let scoped_segment = ScopedSegment::from(slice.meta.scoped_segment.clone().as_str());
self.meta.add_slices(slice.meta.clone());
self.meta.slices_dished_out.remove(&scoped_segment);
if self.meta.last_segment_release.elapsed() > REBALANCE_INTERVAL {
debug!("try to rebalance segments across readers");
let read_offset = slice.meta.read_offset;
self.release_segment_from_reader(slice, read_offset).await?;
self.meta.last_segment_release = Instant::now();
} else {
debug!(" slice return to rx success {:?} ", slice.meta);
if let Some(tx) = slice.slice_return_tx.take() {
if let Err(_e) = tx.send(Some(slice.meta.clone())) {
warn!(
"Failed to send segment slice release data for slice {:?}",
slice.meta
);
}
} else {
panic!("This is unexpected, No sender for SegmentSlice present.");
}
}
if self.meta.last_offset_update.elapsed() > UPDATE_OFFSET_INTERVAL {
let mut offset_map: HashMap<ScopedSegment, Offset> = HashMap::new();
for metadata in self.meta.slices.values() {
offset_map.insert(
ScopedSegment::from(metadata.scoped_segment.as_str()),
Offset::new(metadata.read_offset),
);
}
debug!(
" update reader position {:?} for reader {:?} ",
offset_map, self.id
);
self.rg_state
.lock()
.await
.update_reader_positions(&self.id, offset_map)
.await
.context(StateError {})?;
self.meta.last_offset_update = Instant::now();
}
Ok(())
}
pub async fn release_segment_at(
&mut self,
slice: SegmentSlice,
offset: i64,
) -> Result<(), EventReaderError> {
info!(
"releasing segment slice {} at offset {}",
slice.meta.scoped_segment, offset
);
assert!(
offset >= 0,
"the offset where the segment slice is released should be a positive number"
);
assert!(
slice.meta.start_offset <= offset,
"the offset where the segment slice is released should be greater than the start offset"
);
assert!(
slice.meta.end_offset >= offset,
"the offset where the segment slice is released should be less than the end offset"
);
if self.meta.reader_offline {
return Err(EventReaderError::StateError {
source: ReaderGroupStateError::ReaderAlreadyOfflineError {
error_msg: format!("Reader already marked offline {:?}", self.id),
source: SynchronizerError::SyncPreconditionError {
error_msg: String::from("Precondition failure"),
},
},
});
}
let segment = ScopedSegment::from(slice.meta.scoped_segment.as_str());
if slice.meta.read_offset != offset {
self.meta.stop_reading(&segment);
let slice_meta = SliceMetadata {
start_offset: slice.meta.read_offset,
scoped_segment: slice.meta.scoped_segment.clone(),
last_event_offset: slice.meta.last_event_offset,
read_offset: offset,
end_offset: slice.meta.end_offset,
segment_data: SegmentDataBuffer::empty(),
partial_data_present: false,
};
let (tx_drop_fetch, rx_drop_fetch) = oneshot::channel();
tokio::spawn(SegmentSlice::get_segment_data(
segment.clone(),
slice_meta.read_offset, self.tx.clone(),
rx_drop_fetch,
self.factory.clone(),
));
self.meta.add_stop_reading_tx(segment.clone(), tx_drop_fetch);
self.meta.add_slices(slice_meta);
self.meta.slices_dished_out.remove(&segment);
} else {
self.release_segment(slice).await?;
}
Ok(())
}
async fn reader_offline_internal(
reader_id: Reader,
rg_state: Arc<Mutex<ReaderGroupState>>,
meta: &mut ReaderState,
) -> Result<(), EventReaderError> {
if !meta.reader_offline && rg_state.lock().await.check_online(&reader_id).await {
info!("static Putting reader {:?} offline", reader_id);
meta.stop_reading_all();
meta.close_all_slice_return_channel();
let mut offset_map: HashMap<ScopedSegment, Offset> = HashMap::new();
for (seg, slice_meta) in meta.slices_dished_out.drain() {
offset_map.insert(seg, Offset::new(slice_meta.read_offset));
}
for meta in meta.slices.values() {
offset_map.insert(
ScopedSegment::from(meta.scoped_segment.as_str()),
Offset::new(meta.read_offset),
);
}
match rg_state.lock().await.remove_reader(&reader_id, offset_map).await {
Ok(()) => {
meta.reader_offline = true;
Ok(())
}
Err(e) => match e {
ReaderGroupStateError::ReaderAlreadyOfflineError { .. } => {
meta.reader_offline = true;
info!("staticReader {:?} is already offline", reader_id);
Ok(())
}
state_err => Err(EventReaderError::StateError { source: state_err }),
},
}?
}
Ok(())
}
pub async fn reader_offline(&mut self) -> Result<(), EventReaderError> {
let rg_state = self.rg_state.clone();
let id = self.id.clone();
let mut meta = mem::take(&mut self.meta);
Self::reader_offline_internal(id, rg_state, &mut meta).await
}
async fn release_segment_from_reader(
&mut self,
mut slice: SegmentSlice,
read_offset: i64,
) -> Result<(), EventReaderError> {
if self.meta.reader_offline {
return Err(EventReaderError::StateError {
source: ReaderGroupStateError::ReaderAlreadyOfflineError {
error_msg: format!("Reader already marked offline {:?}", self.id),
source: SynchronizerError::SyncPreconditionError {
error_msg: String::from("Precondition failure"),
},
},
});
}
let new_segments_to_release = self
.rg_state
.lock()
.await
.compute_segments_to_acquire_or_release(&self.id)
.await
.map_err(|err| EventReaderError::StateError { source: err })?;
let segment = ScopedSegment::from(slice.meta.scoped_segment.as_str());
if new_segments_to_release < 0 {
self.meta.stop_reading(&segment);
self.meta
.slices
.remove(&segment)
.expect("Segment missing in meta while releasing from reader");
if let Some(tx) = slice.slice_return_tx.take() {
if let Err(_e) = tx.send(None) {
warn!(
"Failed to send segment slice release data for slice {:?}",
slice.meta
);
}
} else {
panic!("This is unexpected, No sender for SegmentSlice present.");
}
self.rg_state
.lock()
.await
.release_segment(&self.id, &segment, &Offset::new(read_offset))
.await
.context(StateError {})?;
}
Ok(())
}
pub async fn acquire_segment(&mut self) -> Result<Option<SegmentSlice>, EventReaderError> {
if self.meta.reader_offline || !self.rg_state.lock().await.check_online(&self.id).await {
return Err(EventReaderError::StateError {
source: ReaderGroupStateError::ReaderAlreadyOfflineError {
error_msg: format!(
"Reader already marked offline {:?} or the ReaderGroup is deleted",
self.id
),
source: SynchronizerError::SyncPreconditionError {
error_msg: String::from("Precondition failure"),
},
},
});
}
if self.meta.last_offset_update.elapsed() > UPDATE_OFFSET_INTERVAL {
let mut offset_map: HashMap<ScopedSegment, Offset> = HashMap::new();
for metadata in self.meta.slices.values() {
offset_map.insert(
ScopedSegment::from(metadata.scoped_segment.as_str()),
Offset::new(metadata.read_offset),
);
}
debug!(
" update reader position {:?} for reader {:?} ",
offset_map, self.id
);
self.rg_state
.lock()
.await
.update_reader_positions(&self.id, offset_map)
.await
.context(StateError {})?;
self.meta.last_offset_update = Instant::now();
}
info!("acquiring segment for reader {:?}", self.id);
if self.meta.last_segment_acquire.elapsed() > REBALANCE_INTERVAL {
info!("need to rebalance segments across readers");
let res = self.assign_segments_to_reader().await.context(StateError {})?;
if let Some(new_segments) = res {
let current_segments = self
.rg_state
.lock()
.await
.get_segments_for_reader(&self.id)
.await
.map_err(|e| SyncError {
error_msg: format!("failed to get segments for reader {:?}", self.id),
source: e,
})
.context(StateError {})?;
let new_segments: HashSet<(ScopedSegment, Offset)> = current_segments
.into_iter()
.filter(|(seg, _off)| new_segments.contains(seg))
.collect();
debug!("segments which can be read next are {:?}", new_segments);
self.initiate_segment_reads(new_segments);
self.meta.last_segment_acquire = Instant::now();
}
}
if let Some(segment_with_data) = self.meta.get_segment_id_with_data() {
info!("segment {} has data ready to read", segment_with_data);
let slice_meta = self.meta.slices.remove(&segment_with_data).unwrap();
let segment = ScopedSegment::from(slice_meta.scoped_segment.as_str());
let (slice_return_tx, slice_return_rx) = oneshot::channel();
self.meta.add_slice_release_receiver(segment, slice_return_rx);
info!(
"segment slice for {:?} is ready for consumption by reader {}",
slice_meta.scoped_segment, self.id,
);
self.meta
.slices_dished_out
.insert(segment_with_data, slice_meta.copy_meta());
Ok(Some(SegmentSlice {
meta: slice_meta,
slice_return_tx: Some(slice_return_tx),
}))
} else if let Ok(option) = timeout(Duration::from_millis(1000), self.rx.recv()).await {
if let Some(read_result) = option {
match read_result {
Ok(data) => {
let segment = ScopedSegment::from(data.segment.clone().as_str());
info!("new data fetched from server for segment {:?}", segment);
if let Some(mut slice_meta) = self.meta.remove_segment(segment.clone()).await {
if data.offset_in_segment
!= slice_meta.read_offset + slice_meta.segment_data.value.len() as i64
{
info!("Data from an invalid offset {:?} observed. Expected offset {:?}. Ignoring this data", data.offset_in_segment, slice_meta.read_offset);
Ok(None)
} else {
EventReader::add_data_to_segment_slice(data, &mut slice_meta);
let (slice_return_tx, slice_return_rx) = oneshot::channel();
self.meta
.add_slice_release_receiver(segment.clone(), slice_return_rx);
self.meta
.slices_dished_out
.insert(segment.clone(), slice_meta.copy_meta());
info!(
"segment slice for {:?} is ready for consumption by reader {}",
slice_meta, self.id,
);
Ok(Some(SegmentSlice {
meta: slice_meta,
slice_return_tx: Some(slice_return_tx),
}))
}
} else {
debug!("ignore the received data since None was returned");
Ok(None)
}
}
Err((e, offset)) => {
let segment = ScopedSegment::from(e.get_segment().as_str());
debug!(
"Reader Error observed {:?} on segment {:?} at offset {:?} ",
e, segment, offset
);
if let Some(slice_meta) = self.meta.remove_segment(segment.clone()).await {
if slice_meta.read_offset != offset {
info!("Error at an invalid offset {:?} observed. Expected offset {:?}. Ignoring this data", offset, slice_meta.start_offset);
self.meta.add_slices(slice_meta);
self.meta.slices_dished_out.remove(&segment);
} else {
info!("Segment slice {:?} has received error {:?}", slice_meta, e);
self.fetch_successors(e).await.context(StateError {})?;
}
}
debug!("Segment Slice meta {:?}", self.meta.slices);
Ok(None)
}
}
} else {
warn!("Error getting updates from segment slice for reader {}", self.id);
Ok(None)
}
} else {
info!(
"reader {} owns {} slices but none is ready to read",
self.id,
self.meta.slices.len()
);
Ok(None)
}
}
async fn fetch_successors(&mut self, e: ReaderError) -> Result<(), ReaderGroupStateError> {
match e {
ReaderError::SegmentSealed {
segment,
can_retry: _,
operation: _,
error_msg: _,
}
| ReaderError::SegmentIsTruncated {
segment,
can_retry: _,
operation: _,
error_msg: _,
} => {
let completed_scoped_segment = ScopedSegment::from(segment.as_str());
self.meta.stop_reading(&completed_scoped_segment); let successors = self
.factory
.controller_client()
.get_successors(&completed_scoped_segment)
.await
.expect("Failed to fetch successors of the segment")
.segment_with_predecessors;
info!("Segment Completed {:?}", segment);
self.rg_state
.lock()
.await
.segment_completed(&self.id, &completed_scoped_segment, &successors)
.await?;
let option = self.assign_segments_to_reader().await?;
if let Some(new_segments) = option {
let current_segments = self
.rg_state
.lock()
.await
.get_segments_for_reader(&self.id)
.await
.map_err(|e| SyncError {
error_msg: format!("Failed to fetch segments for reader {:?}", self.id),
source: e,
})?;
let new_segments: HashSet<(ScopedSegment, Offset)> = current_segments
.into_iter()
.filter(|(seg, _off)| new_segments.contains(seg))
.collect();
debug!("Segments which can be read next are {:?}", new_segments);
self.initiate_segment_reads(new_segments);
}
}
_ => error!("Error observed while reading from Pravega {:?}", e),
};
Ok(())
}
async fn assign_segments_to_reader(&self) -> Result<Option<Vec<ScopedSegment>>, ReaderGroupStateError> {
let mut new_segments: Vec<ScopedSegment> = Vec::new();
let new_segments_to_acquire = self
.rg_state
.lock()
.await
.compute_segments_to_acquire_or_release(&self.id)
.await
.expect("should compute segments");
if new_segments_to_acquire <= 0 {
Ok(None)
} else {
for _ in 0..new_segments_to_acquire {
if let Some(seg) = self
.rg_state
.lock()
.await
.assign_segment_to_reader(&self.id)
.await?
{
debug!("Acquiring segment {:?} for reader {:?}", seg, self.id);
new_segments.push(seg);
} else {
break;
}
}
debug!("Segments acquired by reader {:?} are {:?}", self.id, new_segments);
Ok(Some(new_segments))
}
}
fn initiate_segment_reads(&mut self, new_segments: HashSet<(ScopedSegment, Offset)>) {
for (seg, offset) in new_segments {
let meta = SliceMetadata {
scoped_segment: seg.to_string(),
start_offset: offset.read,
read_offset: offset.read, ..Default::default()
};
let (tx_drop_fetch, rx_drop_fetch) = oneshot::channel();
tokio::spawn(SegmentSlice::get_segment_data(
seg.clone(),
meta.start_offset,
self.tx.clone(),
rx_drop_fetch,
self.factory.clone(),
));
self.meta.add_stop_reading_tx(seg, tx_drop_fetch);
self.meta.add_slices(meta);
}
}
fn add_data_to_segment_slice(data: SegmentDataBuffer, slice: &mut SliceMetadata) {
if slice.segment_data.value.is_empty() {
slice.segment_data = data;
} else {
slice.segment_data.value.put(data.value); slice.partial_data_present = false;
}
}
async fn get_successors(
&mut self,
completed_scoped_segment: &str,
) -> ImHashMap<SegmentWithRange, Vec<Segment>> {
let completed_scoped_segment = ScopedSegment::from(completed_scoped_segment);
self.factory
.controller_client()
.get_successors(&completed_scoped_segment)
.await
.expect("Failed to fetch successors of the segment")
.segment_with_predecessors
}
}
struct ReaderState {
slices: HashMap<ScopedSegment, SliceMetadata>,
slices_dished_out: HashMap<ScopedSegment, SliceMetadata>,
slice_release_receiver: HashMap<ScopedSegment, oneshot::Receiver<Option<SliceMetadata>>>,
slice_stop_reading: HashMap<ScopedSegment, oneshot::Sender<()>>,
last_segment_release: Instant,
last_segment_acquire: Instant,
last_offset_update: Instant,
reader_offline: bool,
}
impl Default for ReaderState {
fn default() -> Self {
ReaderState {
slices: HashMap::new(),
slices_dished_out: HashMap::new(),
slice_release_receiver: HashMap::new(),
slice_stop_reading: HashMap::new(),
last_segment_release: Instant::now(),
last_segment_acquire: Instant::now(),
last_offset_update: Instant::now(),
reader_offline: false,
}
}
}
impl ReaderState {
fn add_slice_release_receiver(
&mut self,
scoped_segment: ScopedSegment,
slice_return_rx: oneshot::Receiver<Option<SliceMetadata>>,
) {
self.slice_release_receiver
.insert(scoped_segment, slice_return_rx);
}
async fn wait_for_segment_slice_return(&mut self, segment: &ScopedSegment) -> Option<SliceMetadata> {
if let Some(receiver) = self.slice_release_receiver.remove(segment) {
match receiver.await {
Ok(returned_meta) => {
debug!("SegmentSlice returned {:?}", returned_meta);
returned_meta
}
Err(e) => {
error!(
"Error Segment slice was not returned for segment {:?}. Error {:?} ",
segment, e
);
self.slices_dished_out.remove(segment)
}
}
} else {
warn!(
"Invalid segment {:?} provided for while waiting for segment slice return",
segment
);
None
}
}
fn close_all_slice_return_channel(&mut self) {
for (_, mut rx) in self.slice_release_receiver.drain() {
rx.close();
}
}
async fn remove_segment(&mut self, segment: ScopedSegment) -> Option<SliceMetadata> {
match self.slices.remove(&segment) {
Some(meta) => {
debug!(
"Segment slice {:?} has not been dished out for consumption {:?} meta",
&segment, meta
);
Some(meta)
}
None => {
debug!(
"Segment slice for {:?} has already been dished out for consumption",
&segment
);
self.wait_for_segment_slice_return(&segment).await
}
}
}
fn add_slices(&mut self, meta: SliceMetadata) {
if self
.slices
.insert(ScopedSegment::from(meta.scoped_segment.as_str()), meta)
.is_some()
{
panic!("Pre-condition check failure. Segment slice already present");
}
}
fn add_stop_reading_tx(&mut self, segment: ScopedSegment, tx: oneshot::Sender<()>) {
assert!(
self.slice_stop_reading.insert(segment, tx).is_none(),
"Pre-condition check failure. Sender used to stop fetching data is already present"
);
}
fn stop_reading(&mut self, segment: &ScopedSegment) {
if let Some(tx) = self.slice_stop_reading.remove(segment) {
if tx.send(()).is_err() {
debug!("Channel already closed, ignoring the error");
}
}
}
fn stop_reading_all(&mut self) {
for (_, tx) in self.slice_stop_reading.drain() {
if tx.send(()).is_err() {
debug!("Channel already closed, ignoring the error");
}
}
}
fn get_segment_id_with_data(&self) -> Option<ScopedSegment> {
self.slices
.iter()
.find_map(|(k, v)| if v.has_events() { Some(k.clone()) } else { None })
}
}
#[derive(Debug)]
pub struct Event {
pub offset_in_segment: i64,
pub value: Vec<u8>,
}
#[derive(Default)]
pub struct SegmentSlice {
pub meta: SliceMetadata,
pub(crate) slice_return_tx: Option<oneshot::Sender<Option<SliceMetadata>>>,
}
impl SegmentSlice {
fn new(
segment: ScopedSegment,
start_offset: i64,
slice_return_tx: oneshot::Sender<Option<SliceMetadata>>,
) -> Self {
SegmentSlice {
meta: SliceMetadata {
start_offset,
scoped_segment: segment.to_string(),
last_event_offset: 0,
read_offset: start_offset,
end_offset: i64::MAX,
segment_data: SegmentDataBuffer::empty(),
partial_data_present: false,
},
slice_return_tx: Some(slice_return_tx),
}
}
async fn get_segment_data(
segment: ScopedSegment,
start_offset: i64,
tx: Sender<SegmentReadResult>,
mut drop_fetch: oneshot::Receiver<()>,
factory: ClientFactoryAsync,
) {
let mut offset: i64 = start_offset;
let segment_reader = factory.create_async_segment_reader(segment.clone()).await;
loop {
if let Ok(_) | Err(TryRecvError::Closed) = drop_fetch.try_recv() {
info!("Stop reading from the segment");
break;
}
debug!(
"Send read request to Segment store at offset {:?} with length {:?}",
offset, READ_BUFFER_SIZE
);
let read = segment_reader.read(offset, READ_BUFFER_SIZE).await;
match read {
Ok(reply) => {
let len = reply.data.len();
debug!("read data length of {}", len);
if len == 0 && reply.end_of_segment {
info!("Reached end of segment {:?} during read ", segment.clone());
let data = SegmentSealed {
segment: segment.to_string(),
can_retry: false,
operation: "read segment".to_string(),
error_msg: "reached the end of stream".to_string(),
};
if let Err(e) = tx.send(Err((data, offset))).await {
warn!("Error while sending segment data to event parser {:?} ", e);
break;
}
drop(tx);
break;
} else {
let segment_data = bytes::BytesMut::from(reply.data.as_slice());
let data = SegmentDataBuffer {
segment: segment.to_string(),
offset_in_segment: offset,
value: segment_data,
};
if let Err(e) = tx.send(Ok(data)).await {
info!("Error while sending segment data to event parser {:?} ", e);
break;
}
offset += len as i64;
}
}
Err(e) => {
warn!("Error while reading from segment {:?}", e);
if !e.can_retry() {
let _s = tx.send(Err((e, offset))).await;
break;
}
}
}
}
}
fn get_starting_offset(&self) -> i64 {
self.meta.start_offset
}
fn get_segment(&self) -> String {
self.meta.scoped_segment.clone()
}
fn extract_event(
&mut self,
parse_header: fn(&mut SegmentDataBuffer) -> Option<SegmentDataBuffer>,
) -> Option<Event> {
if let Some(mut event_data) = parse_header(&mut self.meta.segment_data) {
let bytes_to_read = event_data.value.capacity();
if bytes_to_read == 0 {
warn!("Found a header with length as zero");
return None;
}
if self.meta.segment_data.value.remaining() >= bytes_to_read + TYPE_PLUS_LENGTH_SIZE as usize {
self.meta.segment_data.advance(TYPE_PLUS_LENGTH_SIZE as usize);
let t = self.meta.segment_data.split_to(bytes_to_read);
event_data.value.put(t.value);
info!("extract event data with length {}", event_data.value.len());
let event = Event {
offset_in_segment: event_data.offset_in_segment,
value: event_data.value.freeze().to_vec(),
};
Some(event)
} else {
debug!(
"partial event read: data read length {}, target read length {}",
event_data.value.len(),
event_data.value.capacity()
);
self.meta.partial_data_present = true;
None
}
} else {
self.meta.partial_data_present = true;
None
}
}
fn read_header(data: &mut SegmentDataBuffer) -> Option<SegmentDataBuffer> {
if data.value.len() >= TYPE_PLUS_LENGTH_SIZE as usize {
let event_offset = data.offset_in_segment;
let mut bytes_temp = data.value.bytes();
let type_code = bytes_temp.get_i32();
let len = bytes_temp.get_i32();
assert_eq!(type_code, EventCommand::TYPE_CODE, "Expected EventCommand here.");
debug!("Event size is {}", len);
Some(SegmentDataBuffer {
segment: data.segment.clone(),
offset_in_segment: event_offset,
value: BytesMut::with_capacity(len as usize),
})
} else {
None
}
}
pub fn is_empty(&self) -> bool {
self.meta.segment_data.value.is_empty() || self.meta.partial_data_present
}
}
impl Iterator for SegmentSlice {
type Item = Event;
fn next(&mut self) -> Option<Self::Item> {
let res = self.extract_event(SegmentSlice::read_header);
match res {
Some(event) => {
self.meta.last_event_offset = event.offset_in_segment;
self.meta.read_offset =
event.offset_in_segment + event.value.len() as i64 + TYPE_PLUS_LENGTH_SIZE as i64;
if !self.meta.is_empty() {
assert_eq!(
self.meta.read_offset, self.meta.segment_data.offset_in_segment,
"Error in offset computation"
);
}
Some(event)
}
None => {
if self.meta.is_empty() {
info!(
"Finished reading events from the segment slice of {:?}",
self.meta.scoped_segment
);
} else {
info!("Partial event present in the segment slice of {:?}, this will be returned post a new read request", self.meta.scoped_segment);
}
None
}
}
}
}
impl fmt::Debug for SegmentSlice {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SegmentSlice").field("meta", &self.meta).finish()
}
}
impl Drop for SegmentSlice {
fn drop(&mut self) {
if let Some(sender) = self.slice_return_tx.take() {
let _ = sender.send(Some(self.meta.clone()));
}
}
}
#[derive(Clone)]
pub struct SliceMetadata {
pub start_offset: i64,
pub scoped_segment: String,
pub last_event_offset: i64,
pub read_offset: i64,
pub end_offset: i64,
segment_data: SegmentDataBuffer,
pub partial_data_present: bool,
}
impl SliceMetadata {
fn is_empty(&self) -> bool {
self.segment_data.value.is_empty()
}
pub fn has_events(&self) -> bool {
!self.partial_data_present && self.segment_data.value.len() > TYPE_PLUS_LENGTH_SIZE as usize
}
fn copy_meta(&self) -> SliceMetadata {
SliceMetadata {
start_offset: self.start_offset,
scoped_segment: self.scoped_segment.clone(),
last_event_offset: self.last_event_offset,
read_offset: self.read_offset,
end_offset: self.end_offset,
segment_data: SegmentDataBuffer::empty(),
partial_data_present: false,
}
}
}
impl fmt::Debug for SliceMetadata {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SliceMetadata")
.field("start_offset", &self.start_offset)
.field("scoped_segment", &self.scoped_segment)
.field("last_event_offset", &self.last_event_offset)
.field("read_offset", &self.read_offset)
.field("end_offset", &self.end_offset)
.field("partial_data_present", &self.partial_data_present)
.finish()
}
}
impl Default for SliceMetadata {
fn default() -> Self {
SliceMetadata {
start_offset: Default::default(),
scoped_segment: Default::default(),
last_event_offset: Default::default(),
read_offset: Default::default(),
end_offset: i64::MAX,
segment_data: SegmentDataBuffer::empty(),
partial_data_present: false,
}
}
}
#[derive(Clone)]
struct SegmentDataBuffer {
pub(crate) segment: String,
pub(crate) offset_in_segment: i64,
pub(crate) value: BytesMut,
}
impl fmt::Debug for SegmentDataBuffer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SegmentDataBuffer")
.field("segment", &self.segment)
.field("offset in segment", &self.offset_in_segment)
.field("buffer length", &self.value.len())
.finish()
}
}
impl SegmentDataBuffer {
pub fn split(&mut self) -> SegmentDataBuffer {
let res = self.value.split();
let old_offset = self.offset_in_segment;
let new_offset = old_offset + res.len() as i64;
self.offset_in_segment = new_offset;
SegmentDataBuffer {
segment: self.segment.clone(),
offset_in_segment: old_offset,
value: res,
}
}
pub fn split_to(&mut self, at: usize) -> SegmentDataBuffer {
let old_offset = self.offset_in_segment;
let res = self.value.split_to(at);
self.offset_in_segment = old_offset + at as i64;
SegmentDataBuffer {
segment: self.segment.clone(),
offset_in_segment: old_offset,
value: res,
}
}
pub fn get_i32(&mut self) -> i32 {
let result = self.value.get_i32();
self.offset_in_segment += 4;
result
}
pub fn advance(&mut self, cnt: usize) {
self.value.advance(cnt);
self.offset_in_segment += cnt as i64;
}
pub fn empty() -> SegmentDataBuffer {
SegmentDataBuffer {
segment: Default::default(),
offset_in_segment: 0,
value: BytesMut::with_capacity(0),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::client_factory::ClientFactory;
use crate::event::reader_group_state::ReaderGroupStateError;
use crate::sync::synchronizer::SynchronizerError;
use bytes::{Buf, BufMut, BytesMut};
use mockall::predicate;
use mockall::predicate::*;
use pravega_client_config::{ClientConfigBuilder, MOCK_CONTROLLER_URI};
use pravega_client_shared::{Reader, Scope, ScopedSegment, ScopedStream, Stream};
use pravega_wire_protocol::commands::{Command, EventCommand};
use std::collections::HashMap;
use std::iter;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::{mpsc, Mutex};
use tokio::time::{sleep, Duration};
use tracing::Level;
#[test]
fn test_read_events_single_segment() {
const NUM_EVENTS: usize = 100;
let (tx, rx) = mpsc::channel(1);
tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
let cf = ClientFactory::new(
ClientConfigBuilder::default()
.controller_uri(MOCK_CONTROLLER_URI)
.build()
.unwrap(),
);
let _guard = cf.runtime().enter();
tokio::spawn(generate_variable_size_events(
tx.clone(),
10,
NUM_EVENTS,
0,
false,
));
let init_segments = vec![create_segment_slice(0), create_segment_slice(1)];
let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
rg_mock.expect_check_online().return_const(true);
rg_mock
.expect_compute_segments_to_acquire_or_release()
.return_once(move |_| Ok(0 as isize));
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
let mut reader = EventReader::init_event_reader(
Arc::new(Mutex::new(rg_mock)),
Reader::from("r1".to_string()),
cf.to_async(),
tx.clone(),
rx,
create_slice_map(init_segments),
HashMap::new(),
);
let mut event_count = 0;
let mut event_size = 0;
while let Some(mut slice) = cf.runtime().block_on(reader.acquire_segment()).unwrap() {
loop {
if let Some(event) = slice.next() {
println!("Read event {:?}", event);
assert_eq!(event.value.len(), event_size + 1, "Event has been missed");
assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
event_size += 1;
event_count += 1;
} else {
println!(
"Finished reading from segment {:?}, segment is auto released",
slice.meta.scoped_segment
);
break; }
}
if event_count == NUM_EVENTS {
break;
}
}
}
#[test]
fn test_acquire_segments() {
const NUM_EVENTS: usize = 10;
let (tx, rx) = mpsc::channel(1);
tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
let cf = ClientFactory::new(
ClientConfigBuilder::default()
.controller_uri(MOCK_CONTROLLER_URI)
.build()
.unwrap(),
);
let _guard = cf.runtime().enter();
tokio::spawn(generate_variable_size_events(
tx.clone(),
1024,
NUM_EVENTS,
0,
false,
));
let init_segments = vec![create_segment_slice(0)];
let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
rg_mock
.expect_compute_segments_to_acquire_or_release()
.with(predicate::eq(Reader::from("r1".to_string())))
.return_once(move |_| Ok(1 as isize));
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
rg_mock.expect_check_online().return_const(true);
let res: Result<Option<ScopedSegment>, ReaderGroupStateError> =
Ok(Some(ScopedSegment::from("scope/test/1.#epoch.0")));
rg_mock
.expect_assign_segment_to_reader()
.with(predicate::eq(Reader::from("r1".to_string())))
.return_once(move |_| res);
let mut new_current_segments: HashSet<(ScopedSegment, Offset)> = HashSet::new();
new_current_segments.insert((ScopedSegment::from("scope/test/1.#epoch.0"), Offset::new(0)));
new_current_segments.insert((ScopedSegment::from("scope/test/0.#epoch.0"), Offset::new(0)));
let res: Result<HashSet<(ScopedSegment, Offset)>, SynchronizerError> = Ok(new_current_segments);
rg_mock
.expect_get_segments_for_reader()
.with(predicate::eq(Reader::from("r1".to_string())))
.return_once(move |_| res);
tokio::spawn(generate_variable_size_events(
tx.clone(),
1024,
NUM_EVENTS,
1,
false,
));
let before_time = Instant::now() - Duration::from_secs(15);
let mut reader = EventReader::init_event_reader(
Arc::new(Mutex::new(rg_mock)),
Reader::from("r1".to_string()),
cf.to_async(),
tx.clone(),
rx,
create_slice_map(init_segments),
HashMap::new(),
);
reader.set_last_acquire_release_time(before_time);
let mut event_count = 0;
while let Some(mut slice) = cf.runtime().block_on(reader.acquire_segment()).unwrap() {
loop {
if let Some(event) = slice.next() {
println!("Read event {:?}", event);
assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
event_count += 1;
} else {
println!(
"Finished reading from segment {:?}, segment is auto released",
slice.meta.scoped_segment
);
break; }
}
if event_count == NUM_EVENTS + NUM_EVENTS {
break;
}
}
assert_eq!(event_count, NUM_EVENTS + NUM_EVENTS);
}
#[test]
fn test_read_events_multiple_segments() {
const NUM_EVENTS: usize = 100;
let (tx, rx) = mpsc::channel(1);
tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
let cf = ClientFactory::new(
ClientConfigBuilder::default()
.controller_uri(MOCK_CONTROLLER_URI)
.build()
.unwrap(),
);
let _guard = cf.runtime().enter();
tokio::spawn(generate_variable_size_events(
tx.clone(),
100,
NUM_EVENTS,
0,
false,
));
tokio::spawn(generate_variable_size_events(
tx.clone(),
100,
NUM_EVENTS,
1,
true,
));
let init_segments = vec![create_segment_slice(0), create_segment_slice(1)];
let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
rg_mock
.expect_compute_segments_to_acquire_or_release()
.return_once(move |_| Ok(0 as isize));
rg_mock.expect_check_online().return_const(true);
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
rg_mock
.expect_update_reader_positions()
.return_once(move |_, _| Ok(()));
let mut reader = EventReader::init_event_reader(
Arc::new(Mutex::new(rg_mock)),
Reader::from("r1".to_string()),
cf.to_async(),
tx.clone(),
rx,
create_slice_map(init_segments),
HashMap::new(),
);
let mut event_count_per_segment: HashMap<String, usize> = HashMap::new();
let mut total_events_read = 0;
while let Some(mut slice) = cf.runtime().block_on(reader.acquire_segment()).unwrap() {
let segment = slice.meta.scoped_segment.clone();
println!("Received Segment Slice {:?}", segment);
let mut event_count = 0;
loop {
if let Some(event) = slice.next() {
println!("Read event {:?}", event);
assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
event_count += 1;
} else {
println!(
"Finished reading from segment {:?}, segment is auto released",
slice.meta.scoped_segment
);
break; }
}
total_events_read += event_count;
*event_count_per_segment
.entry(segment.clone())
.or_insert(event_count) += event_count;
if total_events_read == NUM_EVENTS * 2 {
break;
}
}
}
#[test]
fn test_return_slice() {
const NUM_EVENTS: usize = 2;
let (tx, rx) = mpsc::channel(1);
tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
let cf = ClientFactory::new(
ClientConfigBuilder::default()
.controller_uri(MOCK_CONTROLLER_URI)
.build()
.unwrap(),
);
let _guard = cf.runtime().enter();
tokio::spawn(generate_variable_size_events(
tx.clone(),
10,
NUM_EVENTS,
0,
false,
));
let init_segments = vec![create_segment_slice(0), create_segment_slice(1)];
let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
rg_mock.expect_check_online().return_const(true);
rg_mock
.expect_compute_segments_to_acquire_or_release()
.return_once(move |_| Ok(0 as isize));
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
let mut reader = EventReader::init_event_reader(
Arc::new(Mutex::new(rg_mock)),
Reader::from("r1".to_string()),
cf.to_async(),
tx.clone(),
rx,
create_slice_map(init_segments),
HashMap::new(),
);
let mut slice = cf
.runtime()
.block_on(reader.acquire_segment())
.expect("Failed to acquire segment since the reader is offline")
.unwrap();
let event = slice.next().unwrap();
assert_eq!(event.value.len(), 1);
assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
assert_eq!(event.offset_in_segment, 0); let _ = cf.runtime().block_on(reader.release_segment(slice));
let slice = cf
.runtime()
.block_on(reader.acquire_segment())
.expect("Failed to acquire segment since the reader is offline")
.unwrap();
let _ = cf.runtime().block_on(reader.release_segment(slice));
let mut slice = cf
.runtime()
.block_on(reader.acquire_segment())
.expect("Failed to acquire segment")
.unwrap();
let event = slice.next().unwrap();
assert_eq!(event.value.len(), 2);
assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
assert_eq!(event.offset_in_segment, 8 + 1); }
#[test]
fn test_return_slice_at_offset() {
const NUM_EVENTS: usize = 2;
let (tx, rx) = mpsc::channel(1);
let (stop_tx, stop_rx) = oneshot::channel();
tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
let cf = ClientFactory::new(
ClientConfigBuilder::default()
.controller_uri(MOCK_CONTROLLER_URI)
.build()
.unwrap(),
);
let _guard = cf.runtime().enter();
tokio::spawn(generate_constant_size_events(
tx.clone(),
20,
NUM_EVENTS,
0,
false,
stop_rx,
));
let mut stop_reading_map: HashMap<ScopedSegment, oneshot::Sender<()>> = HashMap::new();
stop_reading_map.insert(ScopedSegment::from("scope/test/0.#epoch.0"), stop_tx);
let init_segments = vec![create_segment_slice(0), create_segment_slice(1)];
let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
rg_mock.expect_check_online().return_const(true);
rg_mock
.expect_compute_segments_to_acquire_or_release()
.return_once(move |_| Ok(0 as isize));
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
let mut reader = EventReader::init_event_reader(
Arc::new(Mutex::new(rg_mock)),
Reader::from("r1".to_string()),
cf.to_async(),
tx.clone(),
rx,
create_slice_map(init_segments),
stop_reading_map,
);
let mut slice = cf
.runtime()
.block_on(reader.acquire_segment())
.expect("Failed to acquire segment")
.unwrap();
let event = slice.next().unwrap();
assert_eq!(event.value.len(), 1);
assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
assert_eq!(event.offset_in_segment, 0); let result = slice.next();
assert!(result.is_some());
let event = result.unwrap();
assert_eq!(event.value.len(), 1);
assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
assert_eq!(event.offset_in_segment, 9); let _ = cf.runtime().block_on(reader.release_segment_at(slice, 0));
let (_stop_tx, stop_rx) = oneshot::channel();
tokio::spawn(generate_constant_size_events(
tx.clone(),
20,
NUM_EVENTS,
0,
false,
stop_rx,
));
let mut slice = cf
.runtime()
.block_on(reader.acquire_segment())
.expect("Failed to acquire segment")
.unwrap();
let event = slice.next().unwrap();
assert_eq!(event.value.len(), 1);
assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
assert_eq!(event.offset_in_segment, 0); }
#[tokio::test]
async fn test_read_partial_events_buffer_10() {
let (tx, mut rx) = mpsc::channel(1);
tokio::spawn(generate_variable_size_events(tx, 10, 20, 0, false));
let mut segment_slice = create_segment_slice(0);
let mut expected_offset: usize = 0;
let mut expected_event_len = 0;
loop {
if segment_slice.is_empty() {
if let Some(response) = rx.recv().await {
segment_slice
.meta
.segment_data
.value
.put(response.expect("get response").value);
} else {
break; }
}
while let Some(d) = segment_slice.next() {
assert_eq!(expected_offset, d.offset_in_segment as usize);
assert_eq!(expected_event_len + 1, d.value.len());
assert!(is_all_same(d.value.as_slice()));
expected_offset += 8 + expected_event_len + 1;
expected_event_len += 1;
}
}
assert_eq!(20, expected_event_len);
}
#[tokio::test]
async fn test_read_partial_events_buffer_100() {
let (tx, mut rx) = mpsc::channel(1);
tokio::spawn(generate_variable_size_events(tx, 100, 200, 0, false));
let mut segment_slice = create_segment_slice(0);
let mut expected_offset: usize = 0;
let mut expected_event_len = 0;
loop {
if segment_slice.is_empty() {
if let Some(response) = rx.recv().await {
segment_slice
.meta
.segment_data
.value
.put(response.expect("get response").value);
} else {
break; }
}
while let Some(d) = segment_slice.next() {
assert_eq!(expected_offset, d.offset_in_segment as usize);
assert_eq!(expected_event_len + 1, d.value.len());
assert!(is_all_same(d.value.as_slice()));
expected_offset += 8 + expected_event_len + 1;
expected_event_len += 1;
}
}
assert_eq!(200, expected_event_len);
}
fn generate_event_data(len: usize) -> BytesMut {
let mut buf = BytesMut::with_capacity(len + 8);
buf.put_i32(EventCommand::TYPE_CODE);
buf.put_i32(len as i32); let mut data = Vec::new();
data.extend(iter::repeat(b'a').take(len));
buf.put(data.as_slice());
buf
}
async fn generate_multiple_constant_size_events(tx: Sender<SegmentDataBuffer>) {
let mut buf = BytesMut::with_capacity(10);
let segment = ScopedSegment::from("test/test/123").to_string();
buf.put_i32(1);
buf.put_u8(b'a');
buf.put_i32(2);
buf.put(&b"aa"[..]);
tx.send(SegmentDataBuffer {
segment: segment.clone(),
offset_in_segment: 0,
value: buf,
})
.await
.unwrap();
buf = BytesMut::with_capacity(10);
buf.put_i32(3);
buf.put(&b"aaa"[..]);
tx.send(SegmentDataBuffer {
segment: segment.clone(),
offset_in_segment: 0,
value: buf,
})
.await
.unwrap();
buf = BytesMut::with_capacity(10);
buf.put_i32(4);
buf.put(&b"aaaa"[..]);
tx.send(SegmentDataBuffer {
segment: segment.clone(),
offset_in_segment: 0,
value: buf,
})
.await
.unwrap();
buf = BytesMut::with_capacity(10);
buf.put_i32(5);
buf.put(&b"aaaaa"[..]);
tx.send(SegmentDataBuffer {
segment: segment.clone(),
offset_in_segment: 0,
value: buf,
})
.await
.unwrap();
buf = BytesMut::with_capacity(10);
buf.put_i32(6);
buf.put(&b"aaaaaa"[..]);
tx.send(SegmentDataBuffer {
segment: segment.clone(),
offset_in_segment: 0,
value: buf,
})
.await
.unwrap();
buf = BytesMut::with_capacity(10);
buf.put_i32(7);
buf.put(&b"aaaaaa"[..]);
tx.send(SegmentDataBuffer {
segment: segment.clone(),
offset_in_segment: 0,
value: buf,
})
.await
.unwrap();
buf = BytesMut::with_capacity(10);
buf.put_u8(b'a');
buf.put_i32(8);
buf.put(&b"aaaaa"[..]);
tx.send(SegmentDataBuffer {
segment: segment.clone(),
offset_in_segment: 0,
value: buf,
})
.await
.unwrap();
buf = BytesMut::with_capacity(10);
buf.put(&b"aaa"[..]);
tx.send(SegmentDataBuffer {
segment: segment.clone(),
offset_in_segment: 0,
value: buf,
})
.await
.unwrap();
}
async fn generate_multiple_variable_sized_events(tx: Sender<SegmentDataBuffer>) {
for i in 1..11 {
let mut buf = BytesMut::with_capacity(32);
buf.put_i32(i); for _ in 0..i {
buf.put(&b"a"[..]);
}
if let Err(_) = tx
.send(SegmentDataBuffer {
segment: ScopedSegment::from("test/test/123").to_string(),
offset_in_segment: 0,
value: buf,
})
.await
{
warn!("receiver dropped");
return;
}
}
}
fn custom_read_header(data: &mut SegmentDataBuffer) -> Option<SegmentDataBuffer> {
if data.value.remaining() >= 4 {
let mut temp = data.value.bytes();
let len = temp.get_i32();
Some(SegmentDataBuffer {
segment: data.segment.clone(),
offset_in_segment: 0,
value: BytesMut::with_capacity(len as usize),
})
} else {
None
}
}
fn read_n_events(slice: &mut SegmentSlice, events_to_read: usize) {
let mut event_count = 0;
loop {
if event_count == events_to_read {
break;
}
if let Some(event) = slice.next() {
println!("Read event {:?}", event);
assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
event_count += 1;
} else {
println!(
"Finished reading from segment {:?}, segment is auto released",
slice.meta.scoped_segment
);
break;
}
}
}
fn create_slice_map(init_segments: Vec<SegmentSlice>) -> HashMap<ScopedSegment, SliceMetadata> {
let mut map = HashMap::with_capacity(init_segments.len());
for s in init_segments {
map.insert(
ScopedSegment::from(s.meta.scoped_segment.clone().as_str()),
s.meta.clone(),
);
}
map
}
fn get_scoped_stream(scope: &str, stream: &str) -> ScopedStream {
let stream: ScopedStream = ScopedStream {
scope: Scope {
name: scope.to_string(),
},
stream: Stream {
name: stream.to_string(),
},
};
stream
}
async fn generate_constant_size_events(
tx: Sender<SegmentReadResult>,
buf_size: usize,
num_events: usize,
segment_id: usize,
should_delay: bool,
mut stop_generation: oneshot::Receiver<()>,
) {
let mut segment_name = "scope/test/".to_owned();
segment_name.push_str(segment_id.to_string().as_ref());
let mut buf = BytesMut::with_capacity(buf_size);
let mut offset: i64 = 0;
for _i in 1..num_events + 1 {
if let Ok(_) | Err(TryRecvError::Closed) = stop_generation.try_recv() {
break;
}
let mut data = generate_event_data(1); if data.len() < buf.capacity() - buf.len() {
buf.put(data);
} else {
while data.len() > 0 {
let free_space = buf.capacity() - buf.len();
if free_space == 0 {
if should_delay {
sleep(Duration::from_millis(100)).await;
}
tx.send(Ok(SegmentDataBuffer {
segment: ScopedSegment::from(segment_name.as_str()).to_string(),
offset_in_segment: offset,
value: buf,
}))
.await
.unwrap();
offset += buf_size as i64;
buf = BytesMut::with_capacity(buf_size);
} else if free_space >= data.len() {
buf.put(data.split());
} else {
buf.put(data.split_to(free_space));
}
}
}
}
tx.send(Ok(SegmentDataBuffer {
segment: ScopedSegment::from(segment_name.as_str()).to_string(),
offset_in_segment: offset,
value: buf,
}))
.await
.unwrap();
}
async fn generate_variable_size_events(
tx: Sender<SegmentReadResult>,
buf_size: usize,
num_events: usize,
segment_id: usize,
should_delay: bool,
) {
let mut segment_name = "scope/test/".to_owned();
segment_name.push_str(segment_id.to_string().as_ref());
segment_name.push_str(".#epoch.0");
let mut buf = BytesMut::with_capacity(buf_size);
let mut offset: i64 = 0;
for i in 1..num_events + 1 {
let mut data = generate_event_data(i);
if data.len() < buf.capacity() - buf.len() {
buf.put(data);
} else {
while data.len() > 0 {
let free_space = buf.capacity() - buf.len();
if free_space == 0 {
if should_delay {
sleep(Duration::from_millis(100)).await;
}
tx.send(Ok(SegmentDataBuffer {
segment: ScopedSegment::from(segment_name.as_str()).to_string(),
offset_in_segment: offset,
value: buf,
}))
.await
.unwrap();
offset += buf_size as i64;
buf = BytesMut::with_capacity(buf_size);
} else if free_space >= data.len() {
buf.put(data.split());
} else {
buf.put(data.split_to(free_space));
}
}
}
}
tx.send(Ok(SegmentDataBuffer {
segment: ScopedSegment::from(segment_name.as_str()).to_string(),
offset_in_segment: offset,
value: buf,
}))
.await
.unwrap();
}
fn create_segment_slice(segment_id: i64) -> SegmentSlice {
let mut segment_name = "scope/test/".to_owned();
segment_name.push_str(segment_id.to_string().as_ref());
let segment = ScopedSegment::from(segment_name.as_str());
let segment_slice = SegmentSlice {
meta: SliceMetadata {
start_offset: 0,
scoped_segment: segment.to_string(),
last_event_offset: 0,
read_offset: 0,
end_offset: i64::MAX,
segment_data: SegmentDataBuffer::empty(),
partial_data_present: false,
},
slice_return_tx: None,
};
segment_slice
}
fn is_all_same<T: Eq>(slice: &[T]) -> bool {
slice
.get(0)
.map(|first| slice.iter().all(|x| x == first))
.unwrap_or(true)
}
}