Pravega Rust Client and Pravega language bindings user guide.
Welcome to the Pravega Rust client and Pravega language bindings user guide! This book is a companion to Pravega Rust Client and the supported language bindings over the Pravega Rust Client.
This book contains examples and documentation to explain all of Pravega Rust client's use cases in detail.
Please choose from the chapters on the left to jump to individual topics, or continue below to start with Pravega Rust Clients's README.
Rust client for Pravega
This is a native Rust client for Pravega.
Note: Pravega 0.9.0+ is required.
Status
Up to date status can be seen on the wiki.
Goals
The goal is to allow for clients to be written in Rust, as well as provide a common implementation for clients in higher level languages including nodejs.
See the wiki for the status of each language.
Approach
The approach is to write a common native implementation of the internals of the client in Rust. Then use a C ABI to provide an interface for other languages to link against.
Finally for each supported language the low level API is translated into a high level API that is idiomatic for the language.
Book
Check out the Pravega Rust client book for more details.
License
Pravega Rust client, all the source code, is released under the Apache License v2.0.
Quick Start
This chapter demonstrates how to use Pravega Rust client to communicate to a standalone Pravega server.
Running a standalone Pravega server
Navigate to the Pravega Release page and download
a Pravega release. Note that ByteStreamClient
requires Pravega 0.9.0+.
For example in a Linux environment, after downloading and decompressing pravega-0.9.0.tgz
, we can start
a minimal Pravega server by calling
./pravega-0.9.0/bin/pravega-standalone
It spins up a Pravega standalone server that listens to localhost:9090
by default.
Build a simple application
Prerequisites
Make sure you have Rust installed first, check out the official website of how to install Rust.
Creating a new project
Create a new Rust project called my_app
cargo new my_app --bin
in the Cargo.toml
file, add the following as dependencies
[dependencies]
pravega-client = "0.1"
pravega-client-config = "0.1"
pravega-client-shared = "0.1"
tokio = "1"
A simple app that writes and reads events
Check out the event write and read example.
Rust Client
This section describes the Rust client APIs to interact with Pravega.
- Event API
- Byte API
Event Client
Event Writer
An EventWriter can be used to write events into the Pravega Stream.
An Event is a discrete item that can be read and processed independently. Events are written atomically and will appear in the Stream exactly once. An optional routing key can be specified by the user. Events with the same routing key are guaranteed to be read back in the order they were written, while events with different routing keys may be read and processed in parallel.
Event Reader
An EventReader can be used to read events from the Pravega Stream.
Every EventReader belongs to a ReaderGroup. All the EventReaders within a ReaderGroup work together to from a Stream. This allows maximum read performance without duplicate reads. EventReaders internally coordinate with each other within a ReaderGroup using a so-called Synchronizer.
For the details how Synchronizer works please check related blog state synchronizer.
Transactional Event Writer
Pravega transaction provides a mechanism for writing many events atomically. A Transaction is unbounded in size but is bounded in time. If it has not been committed within a time window specified at the time of its creation, it will be automatically aborted.
For more details please refer API docs.
Byte Client
Unlike EventWriter, ByteWriter writes raw bytes into Pravega Stream without adding any headers or encoding. This means that the data stored in the Stream is a continuous stream of bytes. Because of that, data written by ByteWriter can only be read by ByteReader.
The Byte Client is useful in cases where a raw stream of bytes is desirable such as video streaming.
Example walkthrough
Create a ClientFactory
Applications should use ClientFactory
to initialize components. The client doesn't expose
the underlying new
method to users.
#![allow(unused)] fn main() { // assuming pravega controller is listening at localhost:9090 let config = ClientConfigBuilder::default() .controller_uri("localhost:9090") .build() .expect("creating config"); let client_factory = ClientFactory::new(config); }
Create a Byte Writer
Assuming a new stream mystream
has been created under scope myscope
and
it contains a segment whose segmentId is 0.
#![allow(unused)] fn main() { let segment = ScopedSegment::from("myscope/mystream/0"); let mut byte_writer = client_factory.create_byte_writer(segment); }
Seek to tail
Sometimes applications use Byte Writer to write to a segment that contains some preexisting data. In this case, we need to find out the tail offset of the segment first.
#![allow(unused)] fn main() { byte_writer.seek_to_tail(); }
Write some data
It doesn't mean the data is persisted on the server side
when write method returns Ok()
, user should call writer.flush()
to ensure
all data has been acknowledged by the server.
#![allow(unused)] fn main() { let payload = "hello world".to_string().into_bytes(); byte_writer.write(&payload).expect("write"); byte_writer.flush().expect("flush"); }
Truncate the segment
ByteWriter
can also truncate the segment. Truncation means that the
data prior to some offset is not needed anymore. Truncated data cannot be read.
Applications use truncation to save space on the server.
#![allow(unused)] fn main() { byte_writer.truncate_data_before(4).await.expect("truncate segment"); }
Seal the segment
Sealing a segment is basically to make a segment read-only.
#![allow(unused)] fn main() { byte_writer.seal().await.expect("seal segment"); }
Create a Byte Reader
Create a ByteReader
to read the same segment we just write.
#![allow(unused)] fn main() { let segment = ScopedSegment::from("myscope/mystream/0"); let mut byte_reader = client_factory.create_byte_reader(segment); }
Read from an offset
ByteReader
can seek to any offset in the segment and read from it. It also
provides a useful method to show the current head of the readable offset. Remember that
the truncated data is not readable anymore, so the current head of the readable offset
is the truncation offset.
read
method will block until some data are fetched from the server. If returned size
is 0,
then it reaches the end of segment and no more data could be read from this offset.
#![allow(unused)] fn main() { let offset = byte_reader.current_head().await.expect("get current head offset"); let mut buf: Vec<u8> = vec![0; 4]; let size = byte_reader.read(&mut buf).await.expect("read from byte stream"); }
Put it together
use pravega_client_config::ClientConfigBuilder; use pravega_client::client_factory::ClientFactory; use pravega_client_shared::ScopedSegment; use std::io::Write; #[tokio::main] async fn main() { let config = ClientConfigBuilder::default() .controller_uri("localhost:9090") .build() .expect("creating config"); let client_factory = ClientFactory::new(config); let segment = ScopedSegment::from("myscope/mystream/0"); // write let mut byte_writer = client_factory.create_byte_writer(segment); byte_writer.seek_to_tail().await; let payload = "hello world".to_string().into_bytes(); byte_writer.write(&payload).await.expect("write"); byte_writer.flush().await.expect("flush"); byte_writer.truncate_data_before(4).await.expect("truncate segment"); byte_writer.seal().await.expect("seal segment"); // read let segment = ScopedSegment::from("myscope/mystream/0"); let mut byte_reader = client_factory.create_byte_reader(segment).await; let offset = byte_reader.current_head().await.expect("get current head offset"); let mut buf: Vec<u8> = vec![0; 4]; let size = byte_reader.read(&mut buf).await.expect("read from byte stream"); }
For more details please refer API docs.
Developer Corner
Nodejs Client
This section describes:
- Example of Nodejs client to interact with Pravega.
- Steps to generate Nodejs Bindings for Pravega from the Rust client.
- Nodejs API reference.
Pravega Nodejs Client
This project provides a way to interact with Pravega via a Nodejs client.
Pravega is an open source distributed storage service implementing Streams. It offers Stream as the main primitive for the foundation of reliable storage systems: a high-performance, durable, elastic, and unlimited append-only byte stream with strict ordering and consistency.
Install
The client library can be installed using npm or yarn.
npm install @pravega/pravega
#or
yarn add @pravega/pravega
After the package is downloaded from the registry, a node-pre-gyp install
will be triggered to pull the underlying Rust Node addon binary from the Github releases.
Note your os and architecture matters. Only Windows
, MacOS
, and linux
with x86_64
architecture come with a pre-built binary. If there is a connection problem or your platform is not supported, you need to build the native Node addon by pulling the repo and execute several commands to get the binary. They are stated below in the Install Native Addons Manually or Development section.
Example
After an npm init
, add "type": "module",
to your package.json
so node
can load ECMAScript modules correctly. We do provide a CommonJS distribution for legacy compatibility, so feel free to use require()
.
import { StreamCut, StreamManager } from '@pravega/pravega';
// or the following if you are still using CommonJS
const { StreamCut, StreamManager } = require('@pravega/pravega');
const SCOPE = 'scope1';
const STREAM = 'stream1';
const DATA = 'Hello World!';
// Assume Pravega controller is listening at 127.0.0.1:9090
const stream_manager = StreamManager('tcp://127.0.0.1:9090', false, false, true);
// Assume the scope and stream don't exist.
stream_manager.create_scope(SCOPE);
// This will create a stream with only 1 segment.
stream_manager.create_stream(SCOPE, STREAM);
// Write event as string.
const stream_writer_1 = stream_manager.create_writer(SCOPE, STREAM);
await stream_writer_1.write_event(DATA);
await stream_writer_1.write_event(DATA, 'routing_key');
// Write event as bytes.
const enc = new TextEncoder();
const stream_writer_2 = stream_manager.create_writer(SCOPE, STREAM);
stream_writer_2.write_event_bytes(enc.encode(DATA));
stream_writer_2.write_event_bytes(enc.encode(DATA), 'routing_key');
// You can also write them in parallel and await flush.
await stream_writer_2.flush();
// Write events as a transaction.
const stream_txn_writer = stream_manager.create_transaction_writer(SCOPE, STREAM, BigInt(1));
const txn = await stream_txn_writer.begin_txn();
await txn.write_event(DATA);
await txn.write_event_bytes(enc.encode(DATA), 'routing_key');
// You may commit or abort the transaction.
// The previous events aren't preserved in Pravega until next operation.
await txn.commit();
// Create a reader group and a reader.
const reader_group_name = Math.random().toString(36).slice(2, 10);
const reader_name = Math.random().toString(36).slice(2, 10);
const stream_reader_group = stream_manager.create_reader_group(
StreamCut.head(),
reader_group_name,
SCOPE,
STREAM
);
const stream_reader = stream_reader_group.create_reader(reader_name);
// Read data back from stream.
// One `get_segment_slice()` call per segment.
const seg_slice = await stream_reader.get_segment_slice();
const dec = new TextDecoder('utf-8');
for (const event of seg_slice) {
const raw_bytes = event.data();
console.log(`Event at ${event.offset()} reads ${dec.decode(raw_bytes)}`);
}
// Release the current slice so other reader can lock and read this slice.
stream_reader.release_segment(seg_slice);
stream_reader.reader_offline();
// Clean up.
stream_manager.delete_reader_group(SCOPE, reader_group_name);
stream_manager.seal_stream(SCOPE, STREAM);
stream_manager.delete_stream(SCOPE, STREAM);
stream_manager.delete_scope(SCOPE);
With a pravega-standalone
running locally, you can see these outputs after running it:
$ node --version
v16.15.0
$ node index.js
Event at 0 reads Hello World!
Event at 20 reads Hello World!
Event at 40 reads Hello World!
Event at 60 reads Hello World!
Event at 80 reads Hello World!
Event at 100 reads Hello World!
Supported APIs
Check the Supported APIs Wiki page.
A full API reference may be found here.
Install Native Addons Manually
If you find any problems because of the unstable Github connection, you may first install the package without prebuilt binary through npm install @pravega/pravega --ignore-scripts
and then manually download the pravega-{tag}-node-v{node_api_version}-{os}-{arch}-{compiler}.tar.gz
tarball from the release page. Extract the whole dist
folder to ./node_modules/@pravega/pravega
and you should be good to go.
Development
To build or test this binding locally, Rust toolchain must be installed and cargo build
can be executed without any problems in the parent project.
Then you need to install Nodejs related packages via npm i
in this folder.
Tests
npm run build-debug
to build a debug addon./path/to/pravega/bin/pravega-standalone
npm test
Local build and install
npm run release-native
to build a release addon.npm run release-js
to build a release dist.npm pack
to pack a local npm package.npm i pravega-x.y.z.tgz
in your project and use it.