pub struct EventReader {
    pub id: Reader,
    /* private fields */
}
Expand description

Read events from Stream.

An event reader fetches data from its assigned segments as a SegmentSlice, where a SegmentSlice represents data from a Pravega Segment. It provides the following APIs.

  1. A method to initialize the event reader EventReader#init
  2. A method to obtain a SegmentSlice to read events from a Pravega segment. The user can use the SegmentSlice’s iterator API to fetch individual events from a given Segment Slice. EventReader#acquire_segment.
  3. A method to release the Segment back at the given offset. EventReader#release_segment_at. This method needs to be invoked only the user does not consume all the events in a SegmentSlice.
  4. A method to mark the reader as offline.EventReader#reader_offline. This method ensures the segments owned by this readers are transferred to other readers in the reader group.

Examples

use pravega_client_config::{ClientConfigBuilder, MOCK_CONTROLLER_URI};
use pravega_client::client_factory::ClientFactory;
use pravega_client_shared::{ScopedStream, Scope, Stream};

#[tokio::main]
async fn main() {
   let config = ClientConfigBuilder::default()
        .controller_uri(MOCK_CONTROLLER_URI)
        .build()
        .expect("creating config");
    let client_factory = ClientFactory::new(config);
    let stream = ScopedStream {
        scope: Scope::from("scope".to_string()),
        stream: Stream::from("stream".to_string()),
    };
    // Create a reader group to read data from the Pravega stream.
    let rg = client_factory.create_reader_group("rg".to_string(), stream).await;
    // Create a reader under the reader group. The segments of the stream are assigned among the
    // readers which are part of the reader group.
    let mut reader1 = rg.create_reader("r1".to_string()).await;
    // read all events from a given segment slice.
    if let Some(mut segment_slice) =  reader1.acquire_segment().await.expect("Failed to acquire segment since the reader is offline") {
        while let Some(event) = segment_slice.next() {
            println!("Event read is {:?}", event);
        }
    }
    // read one event from the a given  segment slice and return it back.
    if let Some(mut segment_slice) = reader1.acquire_segment().await.expect("Failed to acquire segment since the reader is offline") {
        if let Some(event) = segment_slice.next() {
            println!("Event read is {:?}", event);
            // release the segment slice back to the reader.
            reader1.release_segment(segment_slice).await;
        }
    }
}

Fields§

§id: Reader

Implementations§

source§

impl EventReader

source

pub async fn release_segment( &mut self, slice: SegmentSlice ) -> Result<(), EventReaderError>

Release a partially read segment slice back to event reader.

Note: it may return an error indicating that the reader has already been removed. This means that another thread removes this reader from the ReaderGroup probably due to the host of this reader is assumed dead.

source

pub async fn release_segment_at( &mut self, slice: SegmentSlice, offset: i64 ) -> Result<(), EventReaderError>

Release a segment back to the reader and also indicate the offset up to which the segment slice is consumed.

Note: it may return an error indicating that the reader has already been removed. This means that another thread removes this reader from the ReaderGroup probably due to the host of this reader is assumed dead.

source

pub async fn reader_offline(&mut self) -> Result<(), EventReaderError>

Mark the reader as offline after calling the reader_offline_internal. Note: it may return an error indicating that the reader has already been removed. This means that another thread removes this reader from the ReaderGroup probably due to the host of this reader is assumed dead.

source

pub async fn acquire_segment( &mut self ) -> Result<Option<SegmentSlice>, EventReaderError>

This function returns a SegmentSlice from the data received from the SegmentStore(s). Individual events can be read from the data received using SegmentSlice.next().

Invoking this function multiple times ensure multiple SegmentSlices corresponding to different Segments of the stream are received. In-case we receive data for an already acquired SegmentSlice this method waits until SegmentSlice is completely consumed before returning the data.

Note: it may return an error indicating that the reader is not online. This means that another thread removes this reader from the ReaderGroup probably because the host of this reader is assumed dead.

Trait Implementations§

source§

impl Drop for EventReader

source§

fn drop(&mut self)

Destructor for reader invoked. This will automatically invoke reader_offline().

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
§

impl<T> Pointable for T

§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more