Struct pravega_client::event::EventReader
source · pub struct EventReader {
pub id: Reader,
/* private fields */
}
Expand description
Read events from Stream.
An event reader fetches data from its assigned segments as a SegmentSlice, where a SegmentSlice represents data from a Pravega Segment. It provides the following APIs.
- A method to initialize the event reader EventReader#init
- A method to obtain a SegmentSlice to read events from a Pravega segment. The user can use the SegmentSlice’s iterator API to fetch individual events from a given Segment Slice. EventReader#acquire_segment.
- A method to release the Segment back at the given offset. EventReader#release_segment_at. This method needs to be invoked only the user does not consume all the events in a SegmentSlice.
- A method to mark the reader as offline.EventReader#reader_offline. This method ensures the segments owned by this readers are transferred to other readers in the reader group.
§Examples
use pravega_client_config::{ClientConfigBuilder, MOCK_CONTROLLER_URI};
use pravega_client::client_factory::ClientFactory;
use pravega_client_shared::{ScopedStream, Scope, Stream};
#[tokio::main]
async fn main() {
let config = ClientConfigBuilder::default()
.controller_uri(MOCK_CONTROLLER_URI)
.build()
.expect("creating config");
let client_factory = ClientFactory::new(config);
let stream = ScopedStream {
scope: Scope::from("scope".to_string()),
stream: Stream::from("stream".to_string()),
};
// Create a reader group to read data from the Pravega stream.
let rg = client_factory.create_reader_group("rg".to_string(), stream).await;
// Create a reader under the reader group. The segments of the stream are assigned among the
// readers which are part of the reader group.
let mut reader1 = rg.create_reader("r1".to_string()).await;
// read all events from a given segment slice.
if let Some(mut segment_slice) = reader1.acquire_segment().await.expect("Failed to acquire segment since the reader is offline") {
while let Some(event) = segment_slice.next() {
println!("Event read is {:?}", event);
}
}
// read one event from the a given segment slice and return it back.
if let Some(mut segment_slice) = reader1.acquire_segment().await.expect("Failed to acquire segment since the reader is offline") {
if let Some(event) = segment_slice.next() {
println!("Event read is {:?}", event);
// release the segment slice back to the reader.
reader1.release_segment(segment_slice).await;
}
}
}
Fields§
§id: Reader
Implementations§
source§impl EventReader
impl EventReader
sourcepub async fn release_segment(
&mut self,
slice: SegmentSlice
) -> Result<(), EventReaderError>
pub async fn release_segment( &mut self, slice: SegmentSlice ) -> Result<(), EventReaderError>
Release a partially read segment slice back to event reader.
Note: it may return an error indicating that the reader has already been removed. This means that another thread removes this reader from the ReaderGroup probably due to the host of this reader is assumed dead.
sourcepub async fn release_segment_at(
&mut self,
slice: SegmentSlice,
offset: i64
) -> Result<(), EventReaderError>
pub async fn release_segment_at( &mut self, slice: SegmentSlice, offset: i64 ) -> Result<(), EventReaderError>
Release a segment back to the reader and also indicate the offset up to which the segment slice is consumed.
Note: it may return an error indicating that the reader has already been removed. This means that another thread removes this reader from the ReaderGroup probably due to the host of this reader is assumed dead.
sourcepub async fn reader_offline(&mut self) -> Result<(), EventReaderError>
pub async fn reader_offline(&mut self) -> Result<(), EventReaderError>
Mark the reader as offline after calling the reader_offline_internal. Note: it may return an error indicating that the reader has already been removed. This means that another thread removes this reader from the ReaderGroup probably due to the host of this reader is assumed dead.
sourcepub async fn acquire_segment(
&mut self
) -> Result<Option<SegmentSlice>, EventReaderError>
pub async fn acquire_segment( &mut self ) -> Result<Option<SegmentSlice>, EventReaderError>
This function returns a SegmentSlice from the data received from the SegmentStore(s).
Individual events can be read from the data received using SegmentSlice.next()
.
Invoking this function multiple times ensure multiple SegmentSlices corresponding to different Segments of the stream are received. In-case we receive data for an already acquired SegmentSlice this method waits until SegmentSlice is completely consumed before returning the data.
Note: it may return an error indicating that the reader is not online. This means that another thread removes this reader from the ReaderGroup probably because the host of this reader is assumed dead.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for EventReader
impl !RefUnwindSafe for EventReader
impl Send for EventReader
impl Sync for EventReader
impl Unpin for EventReader
impl !UnwindSafe for EventReader
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request