Pravega Rust Client and Pravega language bindings user guide.

Welcome to the Pravega Rust client and Pravega language bindings user guide! This book is a companion to Pravega Rust Client and the supported language bindings over the Pravega Rust Client.

This book contains examples and documentation to explain all of Pravega Rust client's use cases in detail.

Please choose from the chapters on the left to jump to individual topics, or continue below to start with Pravega Rust Clients's README.

CIbuild codecov

Rust client for Pravega

This is a native Rust client for Pravega.

Note: Pravega 0.9.0+ is required.

Status

Up to date status can be seen on the wiki.

Goals

The goal is to allow for clients to be written in Rust, as well as provide a common implementation for clients in higher level languages including nodejs.

See the wiki for the status of each language.

Approach

The approach is to write a common native implementation of the internals of the client in Rust. Then use a C ABI to provide an interface for other languages to link against.

Finally for each supported language the low level API is translated into a high level API that is idiomatic for the language.

Book

Check out the Pravega Rust client book for more details.

License

Pravega Rust client, all the source code, is released under the Apache License v2.0.

Quick Start

This chapter demonstrates how to use Pravega Rust client to communicate to a standalone Pravega server.

Running a standalone Pravega server

Navigate to the Pravega Release page and download a Pravega release. Note that ByteStreamClient requires Pravega 0.9.0+.

For example in a Linux environment, after downloading and decompressing pravega-0.9.0.tgz, we can start a minimal Pravega server by calling

./pravega-0.9.0/bin/pravega-standalone

It spins up a Pravega standalone server that listens to localhost:9090 by default.

Build a simple application

Prerequisites

Make sure you have Rust installed first, check out the official website of how to install Rust.

Creating a new project

Create a new Rust project called my_app

cargo new my_app --bin

in the Cargo.toml file, add the following as dependencies

[dependencies]
pravega-client = "0.1"
pravega-client-config = "0.1"
pravega-client-shared = "0.1"
tokio = "1"

A simple app that writes and reads events

Check out the event write and read example.

Rust Client

This section describes the Rust client APIs to interact with Pravega.

  • Event API
  • Byte API

Event Client

Event Writer

An EventWriter can be used to write events into the Pravega Stream.

An Event is a discrete item that can be read and processed independently. Events are written atomically and will appear in the Stream exactly once. An optional routing key can be specified by the user. Events with the same routing key are guaranteed to be read back in the order they were written, while events with different routing keys may be read and processed in parallel.

API docs.

Event Reader

An EventReader can be used to read events from the Pravega Stream.

Every EventReader belongs to a ReaderGroup. All the EventReaders within a ReaderGroup work together to from a Stream. This allows maximum read performance without duplicate reads. EventReaders internally coordinate with each other within a ReaderGroup using a so-called Synchronizer.

For the details how Synchronizer works please check related blog state synchronizer.

API docs

Transactional Event Writer

Pravega transaction provides a mechanism for writing many events atomically. A Transaction is unbounded in size but is bounded in time. If it has not been committed within a time window specified at the time of its creation, it will be automatically aborted.

For more details please refer API docs.

Byte Client

Unlike EventWriter, ByteWriter writes raw bytes into Pravega Stream without adding any headers or encoding. This means that the data stored in the Stream is a continuous stream of bytes. Because of that, data written by ByteWriter can only be read by ByteReader.

The Byte Client is useful in cases where a raw stream of bytes is desirable such as video streaming.

Example walkthrough

Create a ClientFactory

Applications should use ClientFactory to initialize components. The client doesn't expose the underlying new method to users.


#![allow(unused)]
fn main() {
// assuming pravega controller is listening at localhost:9090
let config = ClientConfigBuilder::default()
    .controller_uri("localhost:9090")
    .build()
    .expect("creating config");

let client_factory = ClientFactory::new(config);
}

Create a Byte Writer

Assuming a new stream mystream has been created under scope myscope and it contains a segment whose segmentId is 0.


#![allow(unused)]
fn main() {
let segment = ScopedSegment::from("myscope/mystream/0");
let mut byte_writer = client_factory.create_byte_writer(segment);
}

Seek to tail

Sometimes applications use Byte Writer to write to a segment that contains some preexisting data. In this case, we need to find out the tail offset of the segment first.


#![allow(unused)]
fn main() {
byte_writer.seek_to_tail();
}

Write some data

It doesn't mean the data is persisted on the server side when write method returns Ok(), user should call writer.flush() to ensure all data has been acknowledged by the server.


#![allow(unused)]
fn main() {
let payload = "hello world".to_string().into_bytes();
byte_writer.write(&payload).expect("write");
byte_writer.flush().expect("flush");
}

Truncate the segment

ByteWriter can also truncate the segment. Truncation means that the data prior to some offset is not needed anymore. Truncated data cannot be read. Applications use truncation to save space on the server.


#![allow(unused)]
fn main() {
byte_writer.truncate_data_before(4).await.expect("truncate segment");
}

Seal the segment

Sealing a segment is basically to make a segment read-only.


#![allow(unused)]
fn main() {
byte_writer.seal().await.expect("seal segment");
}

Create a Byte Reader

Create a ByteReader to read the same segment we just write.


#![allow(unused)]
fn main() {
let segment = ScopedSegment::from("myscope/mystream/0");
let mut byte_reader = client_factory.create_byte_reader(segment);
}

Read from an offset

ByteReader can seek to any offset in the segment and read from it. It also provides a useful method to show the current head of the readable offset. Remember that the truncated data is not readable anymore, so the current head of the readable offset is the truncation offset.

read method will block until some data are fetched from the server. If returned size is 0, then it reaches the end of segment and no more data could be read from this offset.


#![allow(unused)]
fn main() {
let offset = byte_reader.current_head().await.expect("get current head offset");
let mut buf: Vec<u8> = vec![0; 4];
let size = byte_reader.read(&mut buf).await.expect("read from byte stream");
}

Put it together

use pravega_client_config::ClientConfigBuilder;
use pravega_client::client_factory::ClientFactory;
use pravega_client_shared::ScopedSegment;
use std::io::Write;

#[tokio::main]
async fn main() {
    let config = ClientConfigBuilder::default()
     .controller_uri("localhost:9090")
     .build()
     .expect("creating config");
    
    let client_factory = ClientFactory::new(config);
    
    let segment = ScopedSegment::from("myscope/mystream/0");
    
    // write
    let mut byte_writer = client_factory.create_byte_writer(segment);
    byte_writer.seek_to_tail().await;
    
    let payload = "hello world".to_string().into_bytes();
    
    byte_writer.write(&payload).await.expect("write");
    byte_writer.flush().await.expect("flush");

    byte_writer.truncate_data_before(4).await.expect("truncate segment");

    byte_writer.seal().await.expect("seal segment");

    // read
    let segment = ScopedSegment::from("myscope/mystream/0");
    let mut byte_reader = client_factory.create_byte_reader(segment).await;

    let offset = byte_reader.current_head().await.expect("get current head offset");
    let mut buf: Vec<u8> = vec![0; 4];
    let size = byte_reader.read(&mut buf).await.expect("read from byte stream");
}

For more details please refer API docs.

Developer Corner

Nodejs Client

This section describes:

  • Example of Nodejs client to interact with Pravega.
  • Steps to generate Nodejs Bindings for Pravega from the Rust client.
  • Nodejs API reference.

Pravega Nodejs Client

This project provides a way to interact with Pravega via a Nodejs client.

Pravega is an open source distributed storage service implementing Streams. It offers Stream as the main primitive for the foundation of reliable storage systems: a high-performance, durable, elastic, and unlimited append-only byte stream with strict ordering and consistency.

Install

The client library can be installed using npm or yarn.

npm install @pravega/pravega
#or
yarn add @pravega/pravega

After the package is downloaded from the registry, a node-pre-gyp install will be triggered to pull the underlying Rust Node addon binary from the Github releases.

Note your os and architecture matters. Only Windows, MacOS, and linux with x86_64 architecture come with a pre-built binary. If there is a connection problem or your platform is not supported, you need to build the native Node addon by pulling the repo and execute several commands to get the binary. They are stated below in the Install Native Addons Manually or Development section.

Example

After an npm init, add "type": "module", to your package.json so node can load ECMAScript modules correctly. We do provide a CommonJS distribution for legacy compatibility, so feel free to use require().

import { StreamCut, StreamManager } from '@pravega/pravega';
// or the following if you are still using CommonJS
const { StreamCut, StreamManager } = require('@pravega/pravega');

const SCOPE = 'scope1';
const STREAM = 'stream1';
const DATA = 'Hello World!';

// Assume Pravega controller is listening at 127.0.0.1:9090
const stream_manager = StreamManager('tcp://127.0.0.1:9090', false, false, true);
// Assume the scope and stream don't exist.
stream_manager.create_scope(SCOPE);
// This will create a stream with only 1 segment.
stream_manager.create_stream(SCOPE, STREAM);

// Write event as string.
const stream_writer_1 = stream_manager.create_writer(SCOPE, STREAM);
await stream_writer_1.write_event(DATA);
await stream_writer_1.write_event(DATA, 'routing_key');
// Write event as bytes.
const enc = new TextEncoder();
const stream_writer_2 = stream_manager.create_writer(SCOPE, STREAM);
stream_writer_2.write_event_bytes(enc.encode(DATA));
stream_writer_2.write_event_bytes(enc.encode(DATA), 'routing_key');
// You can also write them in parallel and await flush.
await stream_writer_2.flush();

// Write events as a transaction.
const stream_txn_writer = stream_manager.create_transaction_writer(SCOPE, STREAM, BigInt(1));
const txn = await stream_txn_writer.begin_txn();
await txn.write_event(DATA);
await txn.write_event_bytes(enc.encode(DATA), 'routing_key');
// You may commit or abort the transaction.
// The previous events aren't preserved in Pravega until next operation.
await txn.commit();

// Create a reader group and a reader.
const reader_group_name = Math.random().toString(36).slice(2, 10);
const reader_name = Math.random().toString(36).slice(2, 10);
const stream_reader_group = stream_manager.create_reader_group(
    StreamCut.head(),
    reader_group_name,
    SCOPE,
    STREAM
);
const stream_reader = stream_reader_group.create_reader(reader_name);

// Read data back from stream.
// One `get_segment_slice()` call per segment.
const seg_slice = await stream_reader.get_segment_slice();
const dec = new TextDecoder('utf-8');
for (const event of seg_slice) {
    const raw_bytes = event.data();
    console.log(`Event at ${event.offset()} reads ${dec.decode(raw_bytes)}`);
}
// Release the current slice so other reader can lock and read this slice.
stream_reader.release_segment(seg_slice);
stream_reader.reader_offline();

// Clean up.
stream_manager.delete_reader_group(SCOPE, reader_group_name);
stream_manager.seal_stream(SCOPE, STREAM);
stream_manager.delete_stream(SCOPE, STREAM);
stream_manager.delete_scope(SCOPE);

With a pravega-standalone running locally, you can see these outputs after running it:

$ node --version
v16.15.0
$ node index.js
Event at 0 reads Hello World!
Event at 20 reads Hello World!
Event at 40 reads Hello World!
Event at 60 reads Hello World!
Event at 80 reads Hello World!
Event at 100 reads Hello World!

Supported APIs

Check the Supported APIs Wiki page.

A full API reference may be found here.

Install Native Addons Manually

If you find any problems because of the unstable Github connection, you may first install the package without prebuilt binary through npm install @pravega/pravega --ignore-scripts and then manually download the pravega-{tag}-node-v{node_api_version}-{os}-{arch}-{compiler}.tar.gz tarball from the release page. Extract the whole dist folder to ./node_modules/@pravega/pravega and you should be good to go.

Development

To build or test this binding locally, Rust toolchain must be installed and cargo build can be executed without any problems in the parent project.

Then you need to install Nodejs related packages via npm i in this folder.

Tests

  1. npm run build-debug to build a debug addon.
  2. /path/to/pravega/bin/pravega-standalone
  3. npm test

Local build and install

  1. npm run release-native to build a release addon.
  2. npm run release-js to build a release dist.
  3. npm pack to pack a local npm package.
  4. npm i pravega-x.y.z.tgz in your project and use it.