Byte Client
Unlike EventWriter, ByteWriter writes raw bytes into Pravega Stream without adding any headers or encoding. This means that the data stored in the Stream is a continuous stream of bytes. Because of that, data written by ByteWriter can only be read by ByteReader.
The Byte Client is useful in cases where a raw stream of bytes is desirable such as video streaming.
Example walkthrough
Create a ClientFactory
Applications should use ClientFactory
to initialize components. The client doesn't expose
the underlying new
method to users.
#![allow(unused)] fn main() { // assuming pravega controller is listening at localhost:9090 let config = ClientConfigBuilder::default() .controller_uri("localhost:9090") .build() .expect("creating config"); let client_factory = ClientFactory::new(config); }
Create a Byte Writer
Assuming a new stream mystream
has been created under scope myscope
and
it contains a segment whose segmentId is 0.
#![allow(unused)] fn main() { let segment = ScopedSegment::from("myscope/mystream/0"); let mut byte_writer = client_factory.create_byte_writer(segment); }
Seek to tail
Sometimes applications use Byte Writer to write to a segment that contains some preexisting data. In this case, we need to find out the tail offset of the segment first.
#![allow(unused)] fn main() { byte_writer.seek_to_tail(); }
Write some data
It doesn't mean the data is persisted on the server side
when write method returns Ok()
, user should call writer.flush()
to ensure
all data has been acknowledged by the server.
#![allow(unused)] fn main() { let payload = "hello world".to_string().into_bytes(); byte_writer.write(&payload).expect("write"); byte_writer.flush().expect("flush"); }
Truncate the segment
ByteWriter
can also truncate the segment. Truncation means that the
data prior to some offset is not needed anymore. Truncated data cannot be read.
Applications use truncation to save space on the server.
#![allow(unused)] fn main() { byte_writer.truncate_data_before(4).await.expect("truncate segment"); }
Seal the segment
Sealing a segment is basically to make a segment read-only.
#![allow(unused)] fn main() { byte_writer.seal().await.expect("seal segment"); }
Create a Byte Reader
Create a ByteReader
to read the same segment we just write.
#![allow(unused)] fn main() { let segment = ScopedSegment::from("myscope/mystream/0"); let mut byte_reader = client_factory.create_byte_reader(segment); }
Read from an offset
ByteReader
can seek to any offset in the segment and read from it. It also
provides a useful method to show the current head of the readable offset. Remember that
the truncated data is not readable anymore, so the current head of the readable offset
is the truncation offset.
read
method will block until some data are fetched from the server. If returned size
is 0,
then it reaches the end of segment and no more data could be read from this offset.
#![allow(unused)] fn main() { let offset = byte_reader.current_head().await.expect("get current head offset"); let mut buf: Vec<u8> = vec![0; 4]; let size = byte_reader.read(&mut buf).await.expect("read from byte stream"); }
Put it together
use pravega_client_config::ClientConfigBuilder; use pravega_client::client_factory::ClientFactory; use pravega_client_shared::ScopedSegment; use std::io::Write; #[tokio::main] async fn main() { let config = ClientConfigBuilder::default() .controller_uri("localhost:9090") .build() .expect("creating config"); let client_factory = ClientFactory::new(config); let segment = ScopedSegment::from("myscope/mystream/0"); // write let mut byte_writer = client_factory.create_byte_writer(segment); byte_writer.seek_to_tail().await; let payload = "hello world".to_string().into_bytes(); byte_writer.write(&payload).await.expect("write"); byte_writer.flush().await.expect("flush"); byte_writer.truncate_data_before(4).await.expect("truncate segment"); byte_writer.seal().await.expect("seal segment"); // read let segment = ScopedSegment::from("myscope/mystream/0"); let mut byte_reader = client_factory.create_byte_reader(segment).await; let offset = byte_reader.current_head().await.expect("get current head offset"); let mut buf: Vec<u8> = vec![0; 4]; let size = byte_reader.read(&mut buf).await.expect("read from byte stream"); }
For more details please refer API docs.