use crate::client_factory::ClientFactoryAsync;
use crate::index::{IndexRecord, RECORD_SIZE};
use crate::segment::reader::{AsyncSegmentReader, AsyncSegmentReaderImpl};
use pravega_client_shared::{ScopedSegment, ScopedStream};
use crate::index::writer::INDEX_RECORD_SIZE_ATTRIBUTE_ID;
use crate::segment::metadata::SegmentMetadataClient;
use crate::segment::raw_client::RawClient;
use crate::util::get_request_id;
use async_stream::try_stream;
use futures::stream::Stream;
use pravega_wire_protocol::commands::GetSegmentAttributeCommand;
use pravega_wire_protocol::wire_commands::{Replies, Requests};
use snafu::{ensure, Snafu};
use std::io::SeekFrom;
use tracing::info;
#[derive(Debug, Snafu)]
#[snafu(visibility = "pub")]
pub enum IndexReaderError {
#[snafu(display("Field {} does not exist", msg))]
FieldNotFound { msg: String },
#[snafu(display("Invalid offset: {}", msg))]
InvalidOffset { msg: String },
#[snafu(display("Internal error: {}", msg))]
Internal { msg: String },
}
pub struct IndexReader {
stream: ScopedStream,
factory: ClientFactoryAsync,
meta: SegmentMetadataClient,
segment_reader: AsyncSegmentReaderImpl,
record_size: usize,
}
impl IndexReader {
pub(crate) async fn new(factory: ClientFactoryAsync, stream: ScopedStream) -> Self {
let segments = factory
.controller_client()
.get_head_segments(&stream)
.await
.expect("get head segments");
assert_eq!(
segments.len(),
1,
"Index stream is configured with more than one segment"
);
let segment = segments.iter().next().unwrap().0.clone();
let scoped_segment = ScopedSegment {
scope: stream.scope.clone(),
stream: stream.stream.clone(),
segment,
};
let segment_reader = factory.create_async_segment_reader(scoped_segment.clone()).await;
let meta = factory
.create_segment_metadata_client(scoped_segment.clone())
.await;
let controller_client = factory.controller_client();
let endpoint = controller_client
.get_endpoint_for_segment(&scoped_segment)
.await
.expect("get endpoint for segment");
let raw_client = factory.create_raw_client_for_endpoint(endpoint);
let segment_name = scoped_segment.to_string();
let token = controller_client
.get_or_refresh_delegation_token_for(stream.clone())
.await
.expect("controller error when refreshing token");
let request = Requests::GetSegmentAttribute(GetSegmentAttributeCommand {
request_id: get_request_id(),
segment_name: segment_name.clone(),
attribute_id: INDEX_RECORD_SIZE_ATTRIBUTE_ID,
delegation_token: token,
});
let reply = raw_client
.send_request(&request)
.await
.expect("get segment attribute");
let record_size = match reply {
Replies::SegmentAttribute(cmd) => {
if cmd.value == i64::MIN {
info!("record_size segment attribute for Segment = {} is not set.Falling back to default RECORD_SIZE = {:?}", segment_name.clone() ,RECORD_SIZE);
RECORD_SIZE as usize
} else {
info!(
"record_size segment attribute for Segment = {} is already set to {:?}",
segment_name.clone(),
cmd.value
);
cmd.value as usize
}
}
_ => {
panic!("get segment attribute for record_size failed due to {:?}", reply);
}
};
IndexReader {
stream,
factory,
meta,
segment_reader,
record_size,
}
}
pub async fn search_offset(&self, field: (&'static str, u64)) -> Result<u64, IndexReaderError> {
let record_size_signed: i64 = self.record_size as i64;
let target_key = IndexRecord::hash_key_to_u128(field.0);
let target_value = field.1;
let head = self.head_offset().await.map_err(|e| IndexReaderError::Internal {
msg: format!("error when fetching head offset: {:?}", e),
})? as i64;
let tail = self.tail_offset().await.map_err(|e| IndexReaderError::Internal {
msg: format!("error when fetching tail offset: {:?}", e),
})? as i64;
let mut start = 0;
let num_of_record = (tail - head) / record_size_signed;
let mut end = num_of_record - 1;
while start <= end {
let mid = start + (end - start) / 2;
let record = self
.read_record_from_random_offset((head + mid * record_size_signed) as u64)
.await?;
if let Some(e) = record.fields.iter().find(|&e| e.0 == target_key) {
if e.1 >= target_value {
end = mid - 1;
} else {
start = mid + 1;
}
} else {
start = mid + 1;
}
}
if start == num_of_record {
Err(IndexReaderError::FieldNotFound {
msg: format!("key/value: {}/{}", field.0, field.1),
})
} else {
Ok((head + start * record_size_signed) as u64)
}
}
pub fn read<'stream, 'reader: 'stream>(
&'reader self,
start_offset: u64,
end_offset: u64,
) -> Result<impl Stream<Item = Result<Vec<u8>, IndexReaderError>> + 'stream, IndexReaderError> {
ensure!(
start_offset % (self.record_size as u64) == 0,
InvalidOffset {
msg: format!(
"Start offset {} is invalid as it cannot be divided by the record size {}",
start_offset, self.record_size
)
}
);
if end_offset != u64::MAX {
ensure!(
end_offset % (self.record_size as u64) == 0,
InvalidOffset {
msg: format!(
"End offset {} is invalid as it cannot be divided by the record size {}",
end_offset, self.record_size
)
}
);
}
Ok(try_stream! {
let stream = self.stream.clone();
let record_size = self.record_size;
let mut byte_reader = self.factory.create_byte_reader(stream).await;
let mut num_of_records_to_read = if end_offset == u64::MAX {
u64::MAX
} else {
(end_offset - start_offset) / (record_size as u64)
};
byte_reader.seek(SeekFrom::Start(start_offset))
.await
.map_err(|e| IndexReaderError::InvalidOffset {
msg: format!("invalid seeking offset {:?}", e)
})?;
loop {
let mut buf = vec!{};
let mut size_to_read = record_size as usize;
while size_to_read != 0 {
let mut tmp_buf = vec![0; size_to_read];
let size = byte_reader
.read(&mut tmp_buf)
.await
.map_err(|e| IndexReaderError::Internal {
msg: format!("byte reader read error {:?}", e),
})?;
buf.extend_from_slice(&tmp_buf[..size]);
size_to_read -= size;
}
let record = IndexRecord::read_from(&buf).map_err(|e| IndexReaderError::Internal {
msg: format!("deserialize record {:?}", e),
})?;
yield record.data;
if num_of_records_to_read != u64::MAX {
num_of_records_to_read -= 1;
}
if num_of_records_to_read == 0 {
break;
}
}
})
}
pub async fn first_record_data(&self) -> Result<Vec<u8>, IndexReaderError> {
let head_offset = self.head_offset().await?;
let first_record = self.read_record_from_random_offset(head_offset).await?;
Ok(first_record.data)
}
pub async fn last_record_data(&self) -> Result<Vec<u8>, IndexReaderError> {
let last_offset = self.tail_offset().await?;
let last_record_offset = last_offset - self.record_size as u64;
let last_record = self.read_record_from_random_offset(last_record_offset).await?;
Ok(last_record.data)
}
pub async fn head_offset(&self) -> Result<u64, IndexReaderError> {
self.meta
.fetch_current_starting_head()
.await
.map(|i| i as u64)
.map_err(|e| IndexReaderError::Internal {
msg: format!("failed to get head offset: {:?}", e),
})
}
pub async fn tail_offset(&self) -> Result<u64, IndexReaderError> {
self.meta
.fetch_current_segment_length()
.await
.map(|i| i as u64)
.map_err(|e| IndexReaderError::Internal {
msg: format!("failed to get tail offset: {:?}", e),
})
}
pub(crate) async fn read_record_from_random_offset(
&self,
offset: u64,
) -> Result<IndexRecord, IndexReaderError> {
let segment_read_cmd = self
.segment_reader
.read(offset as i64, self.record_size as i32)
.await
.map_err(|e| IndexReaderError::Internal {
msg: format!("segment reader error: {:?}", e),
})?;
let record =
IndexRecord::read_from(&segment_read_cmd.data).map_err(|e| IndexReaderError::Internal {
msg: format!("record deserialization error: {:?}", e),
})?;
Ok(record)
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::client_factory::ClientFactory;
use crate::util::create_stream;
use pravega_client_config::connection_type::{ConnectionType, MockType};
use pravega_client_config::ClientConfigBuilder;
use pravega_client_shared::PravegaNodeUri;
#[test]
#[should_panic(expected = "get segment attribute for record_size failed due to WrongHost")]
fn test_index_reader_wrong_host() {
let config = ClientConfigBuilder::default()
.connection_type(ConnectionType::Mock(MockType::WrongHost))
.mock(true)
.controller_uri(PravegaNodeUri::from("127.0.0.2:9091".to_string()))
.build()
.unwrap();
let factory = ClientFactory::new(config);
factory.runtime().block_on(create_stream(
&factory,
"testScopeInvalid",
"testStreamInvalid",
1,
));
let stream = ScopedStream::from("testScopeInvalid/testStreamInvalid");
factory
.runtime()
.block_on(factory.create_index_reader(stream.clone()));
}
#[test]
fn test_default_index_reader_size() {
let config = ClientConfigBuilder::default()
.connection_type(ConnectionType::Mock(MockType::Happy))
.mock(true)
.controller_uri(PravegaNodeUri::from("127.0.0.2:9091".to_string()))
.build()
.unwrap();
let factory = ClientFactory::new(config);
factory.runtime().block_on(create_stream(
&factory,
"testScopeInvalid",
"testStreamInvalid",
1,
));
let stream = ScopedStream::from("testScopeInvalid/testStreamInvalid");
let reply = factory
.runtime()
.block_on(factory.create_index_reader(stream.clone()));
assert_eq!(reply.record_size, RECORD_SIZE as usize);
}
}