use crate::client_factory::ClientFactoryAsync;
use crate::segment::raw_client::{RawClient, RawClientError};
use crate::util::get_request_id;
use pravega_client_auth::DelegationTokenProvider;
use pravega_client_retry::retry_async::retry_async;
use pravega_client_retry::retry_result::RetryResult;
use pravega_client_shared::{PravegaNodeUri, Stream as PravegaStream};
use pravega_client_shared::{Scope, ScopedSegment, ScopedStream, Segment};
use pravega_wire_protocol::commands::{
CreateTableSegmentCommand, DeleteTableSegmentCommand, ReadTableCommand, ReadTableEntriesCommand,
ReadTableEntriesDeltaCommand, ReadTableKeysCommand, RemoveTableKeysCommand, TableEntries, TableKey,
TableValue, UpdateTableEntriesCommand,
};
use pravega_wire_protocol::wire_commands::{Replies, Requests};
use async_stream::try_stream;
use futures::stream::Stream;
use serde::Serialize;
use serde_cbor::from_slice;
use serde_cbor::to_vec;
use snafu::Snafu;
use tracing::{debug, info};
pub type Version = i64;
const KVTABLE_SUFFIX: &str = "_kvtable";
#[derive(Debug, Snafu)]
pub enum TableError {
#[snafu(display("Connection error while performing {}: {}", operation, source))]
ConnectionError {
can_retry: bool,
operation: String,
source: RawClientError,
},
#[snafu(display("Key does not exist while performing {}: {}", operation, error_msg))]
KeyDoesNotExist { operation: String, error_msg: String },
#[snafu(display("Table {} does not exist while performing {}", name, operation))]
TableDoesNotExist { operation: String, name: String },
#[snafu(display(
"Incorrect Key version observed while performing {}: {}",
operation,
error_msg
))]
IncorrectKeyVersion { operation: String, error_msg: String },
#[snafu(display("Error observed while performing {} due to {}", operation, error_msg,))]
OperationError { operation: String, error_msg: String },
}
pub struct Table {
name: String,
endpoint: PravegaNodeUri,
factory: ClientFactoryAsync,
delegation_token_provider: DelegationTokenProvider,
}
impl Table {
pub(crate) async fn delete(
scope: Scope,
name: String,
factory: ClientFactoryAsync,
) -> Result<(), TableError> {
let segment = ScopedSegment {
scope,
stream: PravegaStream::from(format!("{}{}", name, KVTABLE_SUFFIX)),
segment: Segment::from(0),
};
info!("deleting table map on {:?}", segment);
let delegation_token_provider = factory
.create_delegation_token_provider(ScopedStream::from(&segment))
.await;
let op = "Delete table segment";
retry_async(factory.config().retry_policy, || {
delete_table_segment(&factory, &segment, &delegation_token_provider)
})
.await
.map_err(|e| TableError::ConnectionError {
can_retry: true,
operation: op.to_string(),
source: e.error,
})
.and_then(|r| match r {
Replies::SegmentDeleted(..) | Replies::NoSuchSegment(..) => {
info!("Table segment {:?} deleted", segment);
Ok(())
}
_ => Err(TableError::OperationError {
operation: op.to_string(),
error_msg: r.to_string(),
}),
})
}
pub(crate) async fn new(
scope: Scope,
name: String,
factory: ClientFactoryAsync,
) -> Result<Table, TableError> {
let segment = ScopedSegment {
scope,
stream: PravegaStream::from(format!("{}{}", name, KVTABLE_SUFFIX)),
segment: Segment::from(0),
};
info!("creating table map on {:?}", segment);
let delegation_token_provider = factory
.create_delegation_token_provider(ScopedStream::from(&segment))
.await;
let op = "Create table segment";
retry_async(factory.config().retry_policy, || async {
let req = Requests::CreateTableSegment(CreateTableSegmentCommand {
request_id: get_request_id(),
segment: segment.to_string(),
delegation_token: delegation_token_provider
.retrieve_token(factory.controller_client())
.await,
});
let endpoint = factory
.controller_client()
.get_endpoint_for_segment(&segment)
.await
.expect("get endpoint for segment");
debug!("endpoint is {:?}", endpoint);
let result = factory
.create_raw_client_for_endpoint(endpoint.clone())
.send_request(&req)
.await;
match result {
Ok(reply) => RetryResult::Success((reply, endpoint)),
Err(e) => {
if e.is_token_expired() {
delegation_token_provider.signal_token_expiry();
debug!("auth token needs to refresh");
}
debug!("retry on error {:?}", e);
RetryResult::Retry(e)
}
}
})
.await
.map_err(|e| TableError::ConnectionError {
can_retry: true,
operation: op.to_string(),
source: e.error,
})
.and_then(|(r, endpoint)| match r {
Replies::SegmentCreated(..) | Replies::SegmentAlreadyExists(..) => {
info!("Table segment {:?} created", segment);
let table_map = Table {
name: segment.to_string(),
endpoint,
factory,
delegation_token_provider,
};
Ok(table_map)
}
_ => Err(TableError::OperationError {
operation: op.to_string(),
error_msg: r.to_string(),
}),
})
}
pub async fn get<K, V>(&self, k: &K) -> Result<Option<(V, Version)>, TableError>
where
K: Serialize + serde::de::DeserializeOwned,
V: Serialize + serde::de::DeserializeOwned,
{
let key = to_vec(k).expect("error during serialization.");
let read_result = self.get_raw_values(vec![key]).await;
read_result.map(|v| {
let (l, version) = &v[0];
if l.is_empty() {
None
} else {
let value: V = from_slice(l.as_slice()).expect("error during deserialization");
Some((value, *version))
}
})
}
pub async fn insert<K, V>(&self, k: &K, v: &V, offset: i64) -> Result<Version, TableError>
where
K: Serialize + serde::de::DeserializeOwned,
V: Serialize + serde::de::DeserializeOwned,
{
self.insert_conditionally(k, v, TableKey::KEY_NO_VERSION, offset)
.await
}
pub async fn insert_conditionally<K, V>(
&self,
k: &K,
v: &V,
key_version: Version,
offset: i64,
) -> Result<Version, TableError>
where
K: Serialize + serde::de::DeserializeOwned,
V: Serialize + serde::de::DeserializeOwned,
{
let key = to_vec(k).expect("error during serialization.");
let val = to_vec(v).expect("error during serialization.");
self.insert_raw_values(vec![(key, val, key_version)], offset)
.await
.map(|versions| versions[0])
}
pub async fn remove<K: Serialize + serde::de::DeserializeOwned>(
&self,
k: &K,
offset: i64,
) -> Result<(), TableError> {
self.remove_conditionally(k, TableKey::KEY_NO_VERSION, offset)
.await
}
pub async fn remove_conditionally<K>(
&self,
k: &K,
key_version: Version,
offset: i64,
) -> Result<(), TableError>
where
K: Serialize + serde::de::DeserializeOwned,
{
let key = to_vec(k).expect("error during serialization.");
self.remove_raw_values(vec![(key, key_version)], offset).await
}
pub async fn get_all<K, V>(&self, keys: Vec<&K>) -> Result<Vec<Option<(V, Version)>>, TableError>
where
K: Serialize + serde::de::DeserializeOwned,
V: Serialize + serde::de::DeserializeOwned,
{
let keys_raw: Vec<Vec<u8>> = keys
.iter()
.map(|k| to_vec(*k).expect("error during serialization."))
.collect();
let read_result: Result<Vec<(Vec<u8>, Version)>, TableError> = self.get_raw_values(keys_raw).await;
read_result.map(|v| {
v.iter()
.map(|(data, version)| {
if data.is_empty() {
None
} else {
let value: V = from_slice(data.as_slice()).expect("error during deserialization");
Some((value, *version))
}
})
.collect()
})
}
pub async fn insert_all<K, V>(&self, kvps: Vec<(&K, &V)>, offset: i64) -> Result<Vec<Version>, TableError>
where
K: Serialize + serde::de::DeserializeOwned,
V: Serialize + serde::de::DeserializeOwned,
{
let r: Vec<(Vec<u8>, Vec<u8>, Version)> = kvps
.iter()
.map(|(k, v)| {
(
to_vec(k).expect("error during serialization."),
to_vec(v).expect("error during serialization."),
TableKey::KEY_NO_VERSION,
)
})
.collect();
self.insert_raw_values(r, offset).await
}
pub async fn insert_conditionally_all<K, V>(
&self,
kvps: Vec<(&K, &V, Version)>,
offset: i64,
) -> Result<Vec<Version>, TableError>
where
K: Serialize + serde::de::DeserializeOwned,
V: Serialize + serde::de::DeserializeOwned,
{
let r: Vec<(Vec<u8>, Vec<u8>, Version)> = kvps
.iter()
.map(|(k, v, ver)| {
(
to_vec(k).expect("error during serialization."),
to_vec(v).expect("error during serialization."),
*ver,
)
})
.collect();
self.insert_raw_values(r, offset).await
}
pub async fn remove_all<K>(&self, keys: Vec<&K>, offset: i64) -> Result<(), TableError>
where
K: Serialize + serde::de::DeserializeOwned,
{
let r: Vec<(&K, Version)> = keys.iter().map(|k| (*k, TableKey::KEY_NO_VERSION)).collect();
self.remove_conditionally_all(r, offset).await
}
pub async fn remove_conditionally_all<K>(
&self,
keys: Vec<(&K, Version)>,
offset: i64,
) -> Result<(), TableError>
where
K: Serialize + serde::de::DeserializeOwned,
{
let r: Vec<(Vec<u8>, Version)> = keys
.iter()
.map(|(k, v)| (to_vec(k).expect("error during serialization."), *v))
.collect();
self.remove_raw_values(r, offset).await
}
pub fn read_keys_stream<'stream, 'map: 'stream, K: 'stream>(
&'map self,
max_keys_at_once: i32,
) -> impl Stream<Item = Result<(K, Version), TableError>> + 'stream
where
K: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
{
try_stream! {
let mut token: Vec<u8> = Vec::new();
loop {
let res: (Vec<(Vec<u8>, Version)>, Vec<u8>) = self.read_keys_raw(max_keys_at_once, &token).await?;
let (keys, t) = res;
if keys.is_empty() {
break;
} else {
for (key_raw, version) in keys {
let key: K = from_slice(key_raw.as_slice()).expect("error during deserialization");
yield (key, version)
}
token = t;
}
}
}
}
pub fn read_entries_stream<'stream, 'map: 'stream, K: 'map, V: 'map>(
&'map self,
max_entries_at_once: i32,
) -> impl Stream<Item = Result<(K, V, Version), TableError>> + 'stream
where
K: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
V: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
{
try_stream! {
let mut token: Vec<u8> = Vec::new();
loop {
let res: (Vec<(Vec<u8>, Vec<u8>,Version)>, Vec<u8>) = self.read_entries_raw(max_entries_at_once, &token).await?;
let (entries, t) = res;
if entries.is_empty() {
break;
} else {
for (key_raw, value_raw, version) in entries {
let key: K = from_slice(key_raw.as_slice()).expect("error during deserialization");
let value: V = from_slice(value_raw.as_slice()).expect("error during deserialization");
yield (key, value, version)
}
token = t;
}
}
}
}
pub fn read_entries_stream_from_position<'stream, 'map: 'stream, K: 'map, V: 'map>(
&'map self,
max_entries_at_once: i32,
mut from_position: i64,
) -> impl Stream<Item = Result<(K, V, Version, i64), TableError>> + 'stream
where
K: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
V: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
{
try_stream! {
loop {
let res: (Vec<(Vec<u8>, Vec<u8>,Version)>, i64) = self.read_entries_raw_delta(max_entries_at_once, from_position).await?;
let (entries, last_position) = res;
if entries.is_empty() {
break;
} else {
for (key_raw, value_raw, version) in entries {
let key: K = from_slice(key_raw.as_slice()).expect("error during deserialization");
let value: V = from_slice(value_raw.as_slice()).expect("error during deserialization");
yield (key, value, version, last_position)
}
from_position = last_position;
}
}
}
}
async fn get_keys<K>(
&self,
max_keys_at_once: i32,
token: &[u8],
) -> Result<(Vec<(K, Version)>, Vec<u8>), TableError>
where
K: Serialize + serde::de::DeserializeOwned,
{
let res = self.read_keys_raw(max_keys_at_once, token).await;
res.map(|(keys, token)| {
let keys_de: Vec<(K, Version)> = keys
.iter()
.map(|(k, version)| {
let key: K = from_slice(k.as_slice()).expect("error during deserialization");
(key, *version)
})
.collect();
(keys_de, token)
})
}
async fn get_entries<K, V>(
&self,
max_entries_at_once: i32,
token: &[u8],
) -> Result<(Vec<(K, V, Version)>, Vec<u8>), TableError>
where
K: Serialize + serde::de::DeserializeOwned,
V: Serialize + serde::de::DeserializeOwned,
{
let res = self.read_entries_raw(max_entries_at_once, token).await;
res.map(|(entries, token)| {
let entries_de: Vec<(K, V, Version)> = entries
.iter()
.map(|(k, v, version)| {
let key: K = from_slice(k.as_slice()).expect("error during deserialization");
let value: V = from_slice(v.as_slice()).expect("error during deserialization");
(key, value, *version)
})
.collect();
(entries_de, token)
})
}
async fn get_entries_delta<K, V>(
&self,
max_entries_at_once: i32,
from_position: i64,
) -> Result<(Vec<(K, V, Version)>, i64), TableError>
where
K: Serialize + serde::de::DeserializeOwned,
V: Serialize + serde::de::DeserializeOwned,
{
let res = self
.read_entries_raw_delta(max_entries_at_once, from_position)
.await;
res.map(|(entries, token)| {
let entries_de: Vec<(K, V, Version)> = entries
.iter()
.map(|(k, v, version)| {
let key: K = from_slice(k.as_slice()).expect("error during deserialization");
let value: V = from_slice(v.as_slice()).expect("error during deserialization");
(key, value, *version)
})
.collect();
(entries_de, token)
})
}
async fn insert_raw_values(
&self,
kvps: Vec<(Vec<u8>, Vec<u8>, Version)>,
offset: i64,
) -> Result<Vec<Version>, TableError> {
let op = "Insert into tablemap";
retry_async(self.factory.config().retry_policy, || async {
let entries: Vec<(TableKey, TableValue)> = kvps
.iter()
.map(|(k, v, ver)| {
let tk = TableKey::new(k.clone(), *ver);
let tv = TableValue::new(v.clone());
(tk, tv)
})
.collect();
let te = TableEntries { entries };
let req = Requests::UpdateTableEntries(UpdateTableEntriesCommand {
request_id: get_request_id(),
segment: self.name.clone(),
delegation_token: self
.delegation_token_provider
.retrieve_token(self.factory.controller_client())
.await,
table_entries: te,
table_segment_offset: offset,
});
let result = self
.factory
.create_raw_client_for_endpoint(self.endpoint.clone())
.send_request(&req)
.await;
match result {
Ok(reply) => RetryResult::Success(reply),
Err(e) => {
if e.is_token_expired() {
self.delegation_token_provider.signal_token_expiry();
info!("auth token needs to refresh");
}
info!("Table insert retry error {:?}", e);
RetryResult::Retry(e)
}
}
})
.await
.map_err(|e| TableError::ConnectionError {
can_retry: true,
operation: op.into(),
source: e.error,
})
.and_then(|r| match r {
Replies::TableEntriesUpdated(c) => Ok(c.updated_versions),
Replies::TableKeyBadVersion(c) => Err(TableError::IncorrectKeyVersion {
operation: op.into(),
error_msg: c.to_string(),
}),
_ => Err(TableError::OperationError {
operation: op.into(),
error_msg: r.to_string(),
}),
})
}
async fn get_raw_values(&self, keys: Vec<Vec<u8>>) -> Result<Vec<(Vec<u8>, Version)>, TableError> {
let op = "Read from tablemap";
retry_async(self.factory.config().retry_policy, || async {
let table_keys: Vec<TableKey> = keys
.iter()
.map(|k| TableKey::new(k.clone(), TableKey::KEY_NO_VERSION))
.collect();
let req = Requests::ReadTable(ReadTableCommand {
request_id: get_request_id(),
segment: self.name.clone(),
delegation_token: self
.delegation_token_provider
.retrieve_token(self.factory.controller_client())
.await,
keys: table_keys,
});
let result = self
.factory
.create_raw_client_for_endpoint(self.endpoint.clone())
.send_request(&req)
.await;
debug!("Read Response {:?}", result);
match result {
Ok(reply) => RetryResult::Success(reply),
Err(e) => {
if e.is_token_expired() {
self.delegation_token_provider.signal_token_expiry();
info!("auth token needs to refresh");
}
RetryResult::Retry(e)
}
}
})
.await
.map_err(|e| TableError::ConnectionError {
can_retry: true,
operation: op.into(),
source: e.error,
})
.and_then(|reply| match reply {
Replies::TableRead(c) => {
let v: Vec<(TableKey, TableValue)> = c.entries.entries;
if v.is_empty() {
panic!("Invalid response from the Segment store");
} else {
let result: Vec<(Vec<u8>, Version)> =
v.iter().map(|(l, r)| (r.data.clone(), l.key_version)).collect();
Ok(result)
}
}
_ => Err(TableError::OperationError {
operation: op.into(),
error_msg: reply.to_string(),
}),
})
}
async fn remove_raw_values(&self, keys: Vec<(Vec<u8>, Version)>, offset: i64) -> Result<(), TableError> {
let op = "Remove keys from table";
retry_async(self.factory.config().retry_policy, || async {
let tks: Vec<TableKey> = keys
.iter()
.map(|(k, ver)| TableKey::new(k.clone(), *ver))
.collect();
let req = Requests::RemoveTableKeys(RemoveTableKeysCommand {
request_id: get_request_id(),
segment: self.name.clone(),
delegation_token: self
.delegation_token_provider
.retrieve_token(self.factory.controller_client())
.await,
keys: tks,
table_segment_offset: offset,
});
let result = self
.factory
.create_raw_client_for_endpoint(self.endpoint.clone())
.send_request(&req)
.await;
debug!("Reply for RemoveTableKeys request {:?}", result);
match result {
Ok(reply) => RetryResult::Success(reply),
Err(e) => {
if e.is_token_expired() {
self.delegation_token_provider.signal_token_expiry();
debug!("auth token needs to refresh");
}
debug!("retry on error {:?}", e);
RetryResult::Retry(e)
}
}
})
.await
.map_err(|e| TableError::ConnectionError {
can_retry: true,
operation: op.into(),
source: e.error,
})
.and_then(|r| match r {
Replies::TableKeysRemoved(..) => Ok(()),
Replies::TableKeyBadVersion(c) => Err(TableError::IncorrectKeyVersion {
operation: op.into(),
error_msg: c.to_string(),
}),
Replies::TableKeyDoesNotExist(c) => Err(TableError::KeyDoesNotExist {
operation: op.into(),
error_msg: c.to_string(),
}),
_ => Err(TableError::OperationError {
operation: op.into(),
error_msg: r.to_string(),
}),
})
}
async fn read_keys_raw(
&self,
max_keys_at_once: i32,
token: &[u8],
) -> Result<(Vec<(Vec<u8>, Version)>, Vec<u8>), TableError> {
let op = "Read keys";
retry_async(self.factory.config().retry_policy, || async {
let req = Requests::ReadTableKeys(ReadTableKeysCommand {
request_id: get_request_id(),
segment: self.name.clone(),
delegation_token: self
.delegation_token_provider
.retrieve_token(self.factory.controller_client())
.await,
suggested_key_count: max_keys_at_once,
continuation_token: token.to_vec(),
});
let result = self
.factory
.create_raw_client_for_endpoint(self.endpoint.clone())
.send_request(&req)
.await;
match result {
Ok(reply) => RetryResult::Success(reply),
Err(e) => {
if e.is_token_expired() {
self.delegation_token_provider.signal_token_expiry();
info!("auth token needs to refresh");
}
RetryResult::Retry(e)
}
}
})
.await
.map_err(|e| TableError::ConnectionError {
can_retry: true,
operation: op.into(),
source: e.error,
})
.and_then(|r| match r {
Replies::TableKeysRead(c) => {
let keys: Vec<(Vec<u8>, Version)> =
c.keys.iter().map(|k| (k.data.clone(), k.key_version)).collect();
Ok((keys, c.continuation_token))
}
_ => Err(TableError::OperationError {
operation: op.into(),
error_msg: r.to_string(),
}),
})
}
async fn read_entries_raw(
&self,
max_entries_at_once: i32,
token: &[u8],
) -> Result<(Vec<(Vec<u8>, Vec<u8>, Version)>, Vec<u8>), TableError> {
let op = "Read entries";
retry_async(self.factory.config().retry_policy, || async {
let req = Requests::ReadTableEntries(ReadTableEntriesCommand {
request_id: get_request_id(),
segment: self.name.clone(),
delegation_token: self
.delegation_token_provider
.retrieve_token(self.factory.controller_client())
.await,
suggested_entry_count: max_entries_at_once,
continuation_token: token.to_vec(),
});
let result = self
.factory
.create_raw_client_for_endpoint(self.endpoint.clone())
.send_request(&req)
.await;
debug!("Reply for read tableEntries request {:?}", result);
match result {
Ok(reply) => RetryResult::Success(reply),
Err(e) => {
if e.is_token_expired() {
self.delegation_token_provider.signal_token_expiry();
info!("auth token needs to refresh");
}
RetryResult::Retry(e)
}
}
})
.await
.map_err(|e| TableError::ConnectionError {
can_retry: true,
operation: op.into(),
source: e.error,
})
.and_then(|r| {
match r {
Replies::TableEntriesRead(c) => {
let entries: Vec<(Vec<u8>, Vec<u8>, Version)> = c
.entries
.entries
.iter()
.map(|(k, v)| (k.data.clone(), v.data.clone(), k.key_version))
.collect();
Ok((entries, c.continuation_token))
}
_ => Err(TableError::OperationError {
operation: op.into(),
error_msg: r.to_string(),
}),
}
})
}
async fn read_entries_raw_delta(
&self,
max_entries_at_once: i32,
from_position: i64,
) -> Result<(Vec<(Vec<u8>, Vec<u8>, Version)>, i64), TableError> {
let op = "Read entries delta";
retry_async(self.factory.config().retry_policy, || async {
let req = Requests::ReadTableEntriesDelta(ReadTableEntriesDeltaCommand {
request_id: get_request_id(),
segment: self.name.clone(),
delegation_token: self
.delegation_token_provider
.retrieve_token(self.factory.controller_client())
.await,
from_position,
suggested_entry_count: max_entries_at_once,
});
let result = self
.factory
.create_raw_client_for_endpoint(self.endpoint.clone())
.send_request(&req)
.await;
match result {
Ok(reply) => RetryResult::Success(reply),
Err(e) => {
if e.is_token_expired() {
self.delegation_token_provider.signal_token_expiry();
info!("auth token needs to refresh");
}
RetryResult::Retry(e)
}
}
})
.await
.map_err(|e| TableError::ConnectionError {
can_retry: true,
operation: op.into(),
source: e.error,
})
.and_then(|r| {
match r {
Replies::TableEntriesDeltaRead(c) => {
let entries: Vec<(Vec<u8>, Vec<u8>, Version)> = c
.entries
.entries
.iter()
.map(|(k, v)| (k.data.clone(), v.data.clone(), k.key_version))
.collect();
Ok((entries, c.last_position))
}
Replies::NoSuchSegment(c) => {
debug!("Received NoSuchSegment, the table segment is deleted {:?}", c);
Err(TableError::TableDoesNotExist {
operation: op.into(),
name: c.segment,
})
}
_ => Err(TableError::OperationError {
operation: op.into(),
error_msg: "Unexpected response received from Segment Store".to_string(),
}),
}
})
}
}
async fn delete_table_segment(
factory: &ClientFactoryAsync,
segment: &ScopedSegment,
delegation_token_provider: &DelegationTokenProvider,
) -> RetryResult<Replies, RawClientError> {
let req = Requests::DeleteTableSegment(DeleteTableSegmentCommand {
request_id: get_request_id(),
segment: segment.to_string(),
must_be_empty: false,
delegation_token: delegation_token_provider
.retrieve_token(factory.controller_client())
.await,
});
let endpoint = factory
.controller_client()
.get_endpoint_for_segment(segment)
.await
.expect("get endpoint for segment");
debug!("endpoint is {:?}", endpoint);
let result = factory
.create_raw_client_for_endpoint(endpoint.clone())
.send_request(&req)
.await;
match result {
Ok(reply) => RetryResult::Success(reply),
Err(e) => {
if e.is_token_expired() {
delegation_token_provider.signal_token_expiry();
debug!("auth token needs to refresh");
}
debug!("retry on error {:?}", e);
RetryResult::Retry(e)
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::client_factory::ClientFactory;
use pravega_client_config::connection_type::{ConnectionType, MockType};
use pravega_client_config::ClientConfigBuilder;
use pravega_client_shared::PravegaNodeUri;
use tokio::runtime::Runtime;
#[test]
fn test_table_map_unconditional_insert_and_remove() {
let mut rt = Runtime::new().unwrap();
let table_map = create_table_map(&mut rt);
let version = rt
.block_on(table_map.insert(&"key".to_string(), &"value".to_string(), -1))
.expect("unconditionally insert into table map");
assert_eq!(version, 0);
let version = rt
.block_on(table_map.insert(&"key".to_string(), &"value".to_string(), -1))
.expect("unconditionally insert into table map");
assert_eq!(version, 1);
rt.block_on(table_map.remove(&"key".to_string(), -1))
.expect("remove key");
let option: Option<(String, Version)> = rt
.block_on(table_map.get(&"key".to_string()))
.expect("remove key");
assert!(option.is_none());
}
#[test]
fn test_table_map_conditional_insert_and_remove() {
let mut rt = Runtime::new().unwrap();
let table_map = create_table_map(&mut rt);
let version = rt
.block_on(table_map.insert_conditionally(&"key".to_string(), &"value".to_string(), -1, -1))
.expect("unconditionally insert into table map");
assert_eq!(version, 0);
let version = rt
.block_on(table_map.insert_conditionally(&"key".to_string(), &"value".to_string(), 0, -1))
.expect("conditionally insert into table map");
assert_eq!(version, 1);
let result =
rt.block_on(table_map.insert_conditionally(&"key".to_string(), &"value".to_string(), 0, -1));
assert!(result.is_err());
let result = rt.block_on(table_map.remove_conditionally(&"key".to_string(), 1, -1));
assert!(result.is_ok());
let option: Option<(String, Version)> = rt
.block_on(table_map.get(&"key".to_string()))
.expect("remove key");
assert!(option.is_none());
}
#[test]
fn test_table_map_insert_remove_all() {
let mut rt = Runtime::new().unwrap();
let table_map = create_table_map(&mut rt);
let mut kvs = vec![];
let k1 = "k1".to_string();
let v1 = "v1".to_string();
let k2 = "k2".to_string();
let v2 = "v2".to_string();
kvs.push((&k1, &v1, -1));
kvs.push((&k2, &v2, -1));
let version = rt
.block_on(table_map.insert_conditionally_all(kvs, -1))
.expect("unconditionally insert all into table map");
let expected = vec![0, 0];
assert_eq!(version, expected);
let ks = vec![(&k1, 0), (&k2, 0)];
rt.block_on(table_map.remove_conditionally_all(ks, -1))
.expect("conditionally remove all from table map");
let option: Option<(String, Version)> =
rt.block_on(table_map.get(&"k1".to_string())).expect("remove key");
assert!(option.is_none());
let option: Option<(String, Version)> =
rt.block_on(table_map.get(&"k2".to_string())).expect("remove key");
assert!(option.is_none());
}
fn create_table_map(rt: &mut Runtime) -> Table {
let config = ClientConfigBuilder::default()
.connection_type(ConnectionType::Mock(MockType::Happy))
.mock(true)
.controller_uri(PravegaNodeUri::from("127.0.0.2:9091"))
.build()
.unwrap();
let factory = ClientFactory::new(config);
let scope = Scope {
name: "tablemapScope".to_string(),
};
rt.block_on(factory.create_table(scope, "tablemap".to_string()))
}
}