use crate::client_factory::ClientFactoryAsync;
use crate::event::reader_group::ReaderGroupConfigVersioned;
use crate::sync::synchronizer::*;
use pravega_client_shared::{Reader, Scope, ScopedSegment, Segment, SegmentWithRange};
use crate::sync::table::TableError;
#[cfg(test)]
use mockall::automock;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use tracing::{debug, info, warn};
const ASSUMED_LAG_MILLIS: u64 = 30000;
const DEFAULT_INNER_KEY: &str = "default";
const ASSIGNED: &str = "assigned_segments";
const UNASSIGNED: &str = "unassigned_segments";
const FUTURE: &str = "future_segments";
const DISTANCE: &str = "distance_to_tail";
#[derive(Debug, Snafu)]
pub enum ReaderGroupStateError {
#[snafu(display("Synchronizer error while performing {}: {}", error_msg, source))]
SyncError {
error_msg: String,
source: SynchronizerError,
},
ReaderAlreadyOfflineError {
error_msg: String,
source: SynchronizerError,
},
}
impl ReaderGroupStateError {
pub fn is_precondition(&self) -> bool {
true
}
}
pub struct ReaderGroupState {
sync: Synchronizer,
}
#[cfg_attr(test, automock)]
impl ReaderGroupState {
pub(crate) async fn new(
scope: Scope,
reader_group_name: String,
client_factory: &ClientFactoryAsync,
config: ReaderGroupConfigVersioned,
segments_to_offsets: HashMap<ScopedSegment, Offset>,
) -> ReaderGroupState {
let mut sync = client_factory
.create_synchronizer(scope, reader_group_name.clone())
.await;
sync.insert(move |table| {
if table.is_empty() {
table.insert(
"config".to_owned(),
DEFAULT_INNER_KEY.to_owned(),
"ReaderGroupConfigVersioned".to_owned(),
Box::new(config.clone()),
);
for (segment, offset) in &segments_to_offsets {
table.insert(
UNASSIGNED.to_owned(),
segment.to_string(),
"Offset".to_owned(),
Box::new(offset.to_owned()),
);
}
} else {
info!("ReaderGroup {:?} already initialized", reader_group_name);
}
Ok(())
})
.await
.expect("should initialize table synchronizer");
ReaderGroupState { sync }
}
pub async fn add_reader(&mut self, reader: &Reader) -> Result<(), ReaderGroupStateError> {
info!("Adding reader {:?} to reader group", reader);
self.sync
.insert(|table| ReaderGroupState::add_reader_internal(table, reader))
.await
.context(SyncError {
error_msg: format!("add reader {:?}", reader),
})?;
Ok(())
}
fn add_reader_internal(table: &mut Update, reader: &Reader) -> Result<(), SynchronizerError> {
if table.contains_key(ASSIGNED, &reader.to_string()) {
return Err(SynchronizerError::SyncPreconditionError {
error_msg: format!("Failed to add online reader {:?}: reader already online", reader),
});
}
let empty_map: HashMap<ScopedSegment, Offset> = HashMap::new();
table.insert(
ASSIGNED.to_owned(),
reader.to_string(),
"HashMap<ScopedSegment, Offset>".to_owned(),
Box::new(empty_map),
);
table.insert(
DISTANCE.to_owned(),
reader.to_string(),
"u64".to_owned(),
Box::new(u64::MAX),
);
Ok(())
}
pub async fn check_online(&mut self, reader: &Reader) -> bool {
match self.sync.fetch_updates().await {
Ok(update_count) => debug!("Number of updates read is {:?}", update_count),
Err(TableError::TableDoesNotExist { .. }) => return false,
_ => panic!("Fetch updates failed after all retries"),
}
let res = self
.sync
.get_inner_map(ASSIGNED)
.get(&reader.name)
.map(|v| (v.type_id != TOMBSTONE));
res.unwrap_or(false)
}
pub async fn get_online_readers(&mut self) -> Vec<Reader> {
self.sync.fetch_updates().await.expect("should fetch updates");
ReaderGroupState::get_online_readers_internal(self.sync.get_inner_map(ASSIGNED))
}
fn get_online_readers_internal(assigned_segments: HashMap<String, Value>) -> Vec<Reader> {
assigned_segments
.keys()
.map(|k| Reader::from(k.to_owned()))
.collect::<Vec<Reader>>()
}
pub(crate) async fn get_reader_positions(
&mut self,
reader: &Reader,
) -> Result<HashMap<ScopedSegment, Offset>, ReaderGroupStateError> {
self.sync.fetch_updates().await.expect("should fetch updates");
debug!(
"Assaigned segments {:?} for reader {:?} ",
self.sync.get_inner_map(ASSIGNED),
reader
);
ReaderGroupState::get_reader_positions_internal(reader, self.sync.get_inner_map(ASSIGNED))
}
fn get_reader_positions_internal(
reader: &Reader,
assigned_segments: HashMap<String, Value>,
) -> Result<HashMap<ScopedSegment, Offset>, ReaderGroupStateError> {
ReaderGroupState::check_reader_online(&assigned_segments, reader).context(
ReaderAlreadyOfflineError {
error_msg: format!("reader {:?} already offline", reader),
},
)?;
Ok(deserialize_from(
&assigned_segments
.get(&reader.name)
.expect("reader must exist")
.data,
)
.expect("Failed to deserialize reader position"))
}
pub(crate) async fn update_reader_positions(
&mut self,
reader: &Reader,
latest_positions: HashMap<ScopedSegment, Offset>,
) -> Result<(), ReaderGroupStateError> {
self.sync
.insert(|table| {
ReaderGroupState::update_reader_positions_internal(table, reader, &latest_positions)
})
.await
.context(SyncError {
error_msg: format!("update reader {:?} to position {:?}", reader, latest_positions),
})
}
fn update_reader_positions_internal(
table: &mut Update,
reader: &Reader,
latest_positions: &HashMap<ScopedSegment, Offset>,
) -> Result<(), SynchronizerError> {
let mut owned_segments = ReaderGroupState::get_reader_owned_segments_from_table(table, reader)?;
if owned_segments.len() != latest_positions.len() {
warn!(
"owned segments size {} dose not match latest positions size {}",
owned_segments.len(),
latest_positions.len()
);
}
for (segment, offset) in latest_positions {
owned_segments.entry(segment.to_owned()).and_modify(|v| {
v.read = offset.read;
});
}
table.insert(
ASSIGNED.to_owned(),
reader.to_string(),
"HashMap<ScopedSegment, Offset>".to_owned(),
Box::new(owned_segments),
);
Ok(())
}
pub(crate) async fn remove_reader_default(
&mut self,
reader: &Reader,
) -> Result<(), ReaderGroupStateError> {
self.sync
.insert(|table| ReaderGroupState::remove_reader_internal_default(table, reader))
.await
.map_err(|err| match err {
SynchronizerError::SyncPreconditionError { .. } => {
ReaderGroupStateError::ReaderAlreadyOfflineError {
error_msg: format!("Reader {:?} already offline", reader),
source: err,
}
}
_ => ReaderGroupStateError::SyncError {
error_msg: format!(
"remove reader {:?} and put known owned segments to unassigned list",
reader
),
source: err,
},
})?;
Ok(())
}
fn remove_reader_internal_default(table: &mut Update, reader: &Reader) -> Result<(), SynchronizerError> {
let assigned_segments = ReaderGroupState::get_reader_owned_segments_from_table(table, reader)?;
for (segment, pos) in assigned_segments {
table.insert(
UNASSIGNED.to_owned(),
segment.to_string(),
"Offset".to_owned(),
Box::new(pos),
);
}
table.insert_tombstone(ASSIGNED.to_owned(), reader.to_string())?;
table.insert_tombstone(DISTANCE.to_owned(), reader.to_string())?;
Ok(())
}
pub(crate) async fn remove_reader(
&mut self,
reader: &Reader,
owned_segments: HashMap<ScopedSegment, Offset>,
) -> Result<(), ReaderGroupStateError> {
self.sync
.insert(|table| ReaderGroupState::remove_reader_internal(table, reader, &owned_segments))
.await
.map_err(|err| match err {
SynchronizerError::SyncPreconditionError { .. } => {
ReaderGroupStateError::ReaderAlreadyOfflineError {
error_msg: format!("Reader {:?} already offline", reader),
source: err,
}
}
_ => ReaderGroupStateError::SyncError {
error_msg: format!(
"remove reader {:?} and put known owned segments to unassigned list",
reader
),
source: err,
},
})?;
Ok(())
}
fn remove_reader_internal(
table: &mut Update,
reader: &Reader,
owned_segments: &HashMap<ScopedSegment, Offset>,
) -> Result<(), SynchronizerError> {
let assigned_segments = ReaderGroupState::get_reader_owned_segments_from_table(table, reader)?;
for (segment, pos) in assigned_segments {
let offset = owned_segments.get(&segment).map_or(pos, |v| v.to_owned());
table.insert(
UNASSIGNED.to_owned(),
segment.to_string(),
"Offset".to_owned(),
Box::new(offset),
);
}
table.insert_tombstone(ASSIGNED.to_owned(), reader.to_string())?;
table.insert_tombstone(DISTANCE.to_owned(), reader.to_string())?;
Ok(())
}
pub async fn compute_segments_to_acquire_or_release(
&mut self,
reader: &Reader,
) -> Result<isize, ReaderGroupStateError> {
match self.sync.fetch_updates().await {
Ok(update_count) => debug!("Number of updates read is {:?}", update_count),
Err(TableError::TableDoesNotExist { .. }) => {
return Err(ReaderGroupStateError::ReaderAlreadyOfflineError {
error_msg: String::from("the ReaderGroup is deleted"),
source: SynchronizerError::SyncPreconditionError {
error_msg: String::from("Precondition failure"),
},
})
}
_ => panic!("Fetch updates failed after all retries"),
}
let assigned_segment_map = self.sync.get_inner_map(ASSIGNED);
let num_of_readers = assigned_segment_map.len();
let mut num_assigned_segments = 0;
for v in assigned_segment_map.values() {
let segments: HashMap<ScopedSegment, Offset> =
deserialize_from(&v.data).expect("deserialize assigned segments");
num_assigned_segments += segments.len();
}
let num_of_segments = num_assigned_segments + self.sync.get_inner_map(UNASSIGNED).len();
debug!(
" number of segments {:?}, number of readers {:?} in reader group state",
num_of_segments, num_of_readers
);
let num_segments_per_reader = num_of_segments / num_of_readers;
let expected_segment_count_per_reader = if num_of_segments % num_of_readers == 0 {
num_segments_per_reader
} else {
num_segments_per_reader + 1
};
let current_segment_count = assigned_segment_map.get(&reader.name).map(|v| {
let seg: HashMap<ScopedSegment, Offset> =
deserialize_from(&v.data).expect("deserialize of assigned segments");
seg.len()
});
let expected: isize = expected_segment_count_per_reader.try_into().unwrap();
let current: isize = current_segment_count.unwrap_or_default().try_into().unwrap();
Ok(expected - current)
}
pub async fn get_segments(&mut self) -> HashSet<ScopedSegment> {
self.sync.fetch_updates().await.expect("should fetch updates");
let assigned_segments = self.sync.get_inner_map(ASSIGNED);
let unassigned_segments = self.sync.get_inner_map(UNASSIGNED);
println!(
"Assigned Segments {:?} Unassigned Segment {:?}",
assigned_segments.len(),
unassigned_segments.len()
);
let mut set: HashSet<ScopedSegment> = HashSet::new();
for v in assigned_segments.values() {
let segments: HashMap<ScopedSegment, Offset> =
deserialize_from(&v.data).expect("deserialize assigned segments");
set.extend(segments.keys().cloned().collect::<HashSet<ScopedSegment>>())
}
set.extend(
unassigned_segments
.keys()
.map(|segment| {
let segment_str = &*segment.to_owned();
ScopedSegment::from(segment_str)
})
.collect::<HashSet<ScopedSegment>>(),
);
set
}
pub async fn get_streamcut(&mut self) -> HashMap<ScopedSegment, Offset> {
self.sync.fetch_updates().await.expect("should fetch updates");
let assigned_segments = self.sync.get_inner_map(ASSIGNED);
let unassigned_segments = self.sync.get_inner_map(UNASSIGNED);
info!(
"Assigned Segments {:?} Unassigned Segment {:?}",
assigned_segments.len(),
unassigned_segments.len()
);
let mut segment_offset_map: HashMap<ScopedSegment, Offset> = HashMap::new();
for v in assigned_segments.values() {
let segment_offset: HashMap<ScopedSegment, Offset> =
deserialize_from(&v.data).expect("deserialize assigned segments");
segment_offset_map.extend(segment_offset)
}
let unassign_segment_offset: HashMap<ScopedSegment, Offset> = unassigned_segments
.iter()
.map(|(k, v)| {
let segment_str = &*k.to_owned();
(
ScopedSegment::from(segment_str),
deserialize_from(&v.data).expect("deserialize offset"),
)
})
.collect::<HashMap<ScopedSegment, Offset>>();
segment_offset_map.extend(unassign_segment_offset);
debug!("Segment to offset map {:?} from reader group", segment_offset_map);
segment_offset_map
}
pub async fn assign_segment_to_reader(
&mut self,
reader: &Reader,
) -> Result<Option<ScopedSegment>, ReaderGroupStateError> {
let option = self
.sync
.insert(|table| ReaderGroupState::assign_segment_to_reader_internal(table, reader))
.await
.context(SyncError {
error_msg: format!("assign segment to reader {:?}", reader),
})?;
if let Some(segment_str) = option {
Ok(Some(ScopedSegment::from(&*segment_str)))
} else {
Ok(None)
}
}
fn assign_segment_to_reader_internal(
table: &mut Update,
reader: &Reader,
) -> Result<Option<String>, SynchronizerError> {
let mut assigned_segments = ReaderGroupState::get_reader_owned_segments_from_table(table, reader)?;
let unassigned_segments = ReaderGroupState::get_unassigned_segments_from_table(table);
if let Some((segment, offset)) = unassigned_segments.into_iter().next() {
assigned_segments.insert(segment.clone(), offset);
table.insert(
ASSIGNED.to_owned(),
reader.to_string(),
"HashMap<ScopedSegment, Offset>".to_owned(),
Box::new(assigned_segments),
);
table.insert_tombstone(UNASSIGNED.to_owned(), segment.to_string())?;
Ok(Some(segment.to_string()))
} else {
Ok(None)
}
}
pub async fn get_segments_for_reader(
&mut self,
reader: &Reader,
) -> Result<HashSet<(ScopedSegment, Offset)>, SynchronizerError> {
self.sync.fetch_updates().await.expect("should fetch updates");
let value =
self.sync
.get(ASSIGNED, &reader.to_string())
.ok_or(SynchronizerError::SyncUpdateError {
error_msg: format!("reader {} is not online", reader),
})?;
let segments: HashMap<ScopedSegment, Offset> =
deserialize_from(&value.data).expect("deserialize reader owned segments");
Ok(segments
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<HashSet<(ScopedSegment, Offset)>>())
}
pub async fn release_segment(
&mut self,
reader: &Reader,
segment: &ScopedSegment,
offset: &Offset,
) -> Result<(), ReaderGroupStateError> {
debug!(
"Release segment {:?} for reader {:?} at offset {:?}",
segment, reader, offset.read
);
self.sync
.insert(|table| ReaderGroupState::release_segment_internal(table, reader, segment, offset))
.await
.context(SyncError {
error_msg: format!(
"Failed to release segment {:?} with offset {:?} from reader {:?} ",
segment, offset, reader
),
})?;
Ok(())
}
fn release_segment_internal(
table: &mut Update,
reader: &Reader,
segment: &ScopedSegment,
offset: &Offset,
) -> Result<(), SynchronizerError> {
let mut assigned_segments = ReaderGroupState::get_reader_owned_segments_from_table(table, reader)?;
let unassigned_segments = ReaderGroupState::get_unassigned_segments_from_table(table);
let old_offset = assigned_segments.remove(segment).context(SyncUpdateError {
error_msg: format!(
"Failed to release segment: should contain only one segment {:?} in assigned list but contains none",
segment,
)
});
debug!(
"Removed segment {:?} from assigned segments, the older offset is {:?}",
segment, old_offset
);
ensure!(
!unassigned_segments.contains_key(segment),
SyncUpdateError {
error_msg: format!(
"Failed to release segment: unassigned_segment should not have already contained this released segment {:?}",
segment
)
});
table.insert(
ASSIGNED.to_owned(),
reader.to_string(),
"HashMap<ScopedSegment, Offset>".to_owned(),
Box::new(assigned_segments),
);
table.insert(
UNASSIGNED.to_owned(),
segment.clone().to_string(),
"Offset".to_owned(),
Box::new(offset.to_owned()),
);
Ok(())
}
pub async fn segment_completed(
&mut self,
reader: &Reader,
segment_completed: &ScopedSegment,
successors_mapped_to_their_predecessors: &im::HashMap<SegmentWithRange, Vec<Segment>>,
) -> Result<(), ReaderGroupStateError> {
self.sync
.insert(|table| {
ReaderGroupState::segment_completed_internal(
table,
reader,
segment_completed,
successors_mapped_to_their_predecessors,
)
})
.await
.context(SyncError {
error_msg: format!(
"segment {:?} assigned to reader {:?} has completed",
segment_completed, reader
),
})?;
Ok(())
}
fn segment_completed_internal(
table: &mut Update,
reader: &Reader,
segment_completed: &ScopedSegment,
successors_mapped_to_their_predecessors: &im::HashMap<SegmentWithRange, Vec<Segment>>,
) -> Result<Option<String>, SynchronizerError> {
let mut assigned_segments = ReaderGroupState::get_reader_owned_segments_from_table(table, reader)?;
let mut future_segments = ReaderGroupState::get_future_segments_from_table(table);
assigned_segments
.remove(segment_completed)
.expect("should have assigned this segment to reader");
table.insert(
ASSIGNED.to_owned(),
reader.to_string(),
"HashMap<ScopedSegment, Offset>".to_owned(),
Box::new(assigned_segments),
);
for (segment, list) in successors_mapped_to_their_predecessors {
if !future_segments.contains_key(&segment.scoped_segment) {
let required_to_complete: HashSet<_> = list.clone().into_iter().collect();
table.insert(
FUTURE.to_owned(),
segment.scoped_segment.to_string(),
"HashSet<i64>".to_owned(),
Box::new(required_to_complete.clone()),
);
future_segments.insert(segment.scoped_segment.to_owned(), required_to_complete);
}
}
for (segment, required_to_complete) in &mut future_segments {
if required_to_complete.remove(&segment_completed.segment) {
table.insert(
FUTURE.to_owned(),
segment.to_string(),
"HashSet<i64>".to_owned(),
Box::new(required_to_complete.to_owned()),
);
}
}
let ready_to_read = future_segments
.iter()
.filter(|&(_segment, set)| set.is_empty())
.map(|(segment, _set)| segment.to_owned())
.collect::<Vec<ScopedSegment>>();
for segment in ready_to_read {
table.insert(
UNASSIGNED.to_owned(),
segment.to_string(),
"Offset".to_owned(),
Box::new(Offset::new(0)),
);
table.insert_tombstone(FUTURE.to_owned(), segment.to_string())?;
}
Ok(None)
}
fn get_reader_owned_segments_from_table(
table: &mut Update,
reader: &Reader,
) -> Result<HashMap<ScopedSegment, Offset>, SynchronizerError> {
ReaderGroupState::check_reader_online(&table.get_inner_map(ASSIGNED), reader)?;
let value = table
.get(ASSIGNED, &reader.to_string())
.expect("No entries found for reader owned segments");
let owned_segments: HashMap<ScopedSegment, Offset> =
deserialize_from(&value.data).expect("deserialize reader owned segments");
Ok(owned_segments)
}
fn get_unassigned_segments_from_table(table: &mut Update) -> HashMap<ScopedSegment, Offset> {
table
.get_inner_map(UNASSIGNED)
.iter()
.map(|(k, v)| {
let segment_str = &*k.to_owned();
(
ScopedSegment::from(segment_str),
deserialize_from(&v.data).expect("deserialize offset"),
)
})
.collect::<HashMap<ScopedSegment, Offset>>()
}
fn get_future_segments_from_table(table: &mut Update) -> HashMap<ScopedSegment, HashSet<Segment>> {
table
.get_inner_map(FUTURE)
.iter()
.map(|(k, v)| {
let segment_str = &*k.to_owned();
(
ScopedSegment::from(segment_str),
deserialize_from(&v.data).expect("deserialize hashset"),
)
})
.collect::<HashMap<ScopedSegment, HashSet<Segment>>>()
}
fn check_reader_online(
assigned: &HashMap<String, Value>,
reader: &Reader,
) -> Result<(), SynchronizerError> {
if assigned.contains_key(&reader.name) {
Ok(())
} else {
Err(SynchronizerError::SyncPreconditionError {
error_msg: format!("reader {} is not online", reader.name),
})
}
}
}
#[derive(new, Serialize, Deserialize, PartialEq, Debug, Clone, Hash, Eq)]
pub struct Offset {
pub read: i64,
}
#[cfg(test)]
mod test {
use super::*;
use crate::sync::synchronizer::{serialize, Value};
use lazy_static::*;
use ordered_float::OrderedFloat;
use pravega_client_shared::{Scope, Segment, Stream};
lazy_static! {
static ref READER: Reader = Reader::from("test".to_owned());
static ref SEGMENT: ScopedSegment = ScopedSegment {
scope: Scope {
name: "scope".to_string(),
},
stream: Stream {
name: "scope".to_string(),
},
segment: Segment {
number: 0,
tx_id: None,
},
};
static ref SEGMENT_TEST: ScopedSegment = SEGMENT.clone();
}
fn set_up() -> Update {
let offset = Box::new(Offset::new(0));
let data = serialize(&*offset).expect("serialize value");
let mut unassigned_segments = HashMap::new();
unassigned_segments.insert(
SEGMENT_TEST.to_string(),
Value {
type_id: "Offset".to_owned(),
data,
},
);
let mut map = HashMap::new();
map.insert(UNASSIGNED.to_owned(), unassigned_segments);
let counter = HashMap::new();
let table = Update::new(map, counter, vec![], vec![]);
assert!(table.contains_outer_key(UNASSIGNED));
assert!(table.contains_key(UNASSIGNED, &SEGMENT_TEST.to_string()));
table
}
#[test]
fn test_reader_group_state() {
let mut table = set_up();
ReaderGroupState::add_reader_internal(&mut table, &READER).expect("add reader");
assert!(
table.contains_outer_key(ASSIGNED),
"assigned_segments outer map should be created automatically"
);
assert!(
table.contains_key(ASSIGNED, &READER.to_string()),
"should contain inner key"
);
let mut readers = ReaderGroupState::get_online_readers_internal(table.get_inner_map(ASSIGNED));
assert_eq!(
readers.pop().expect("get reader"),
READER.clone(),
"should have online reader added"
);
assert!(readers.is_empty(), "should have only one reader");
ReaderGroupState::assign_segment_to_reader_internal(&mut table, &READER)
.expect("assign segment to reader");
let segments =
ReaderGroupState::get_reader_positions_internal(&READER, table.get_inner_map(ASSIGNED))
.expect("get reader positions");
assert_eq!(segments.len(), 1, "should have assigned one segment the reader");
assert_eq!(
segments.get(&SEGMENT_TEST).expect("get segment"),
&Offset { read: 0 },
"added segment should be as expected"
);
let new_offset = Offset { read: 10 };
let mut update = HashMap::new();
update.insert(SEGMENT_TEST.clone(), new_offset.clone());
ReaderGroupState::update_reader_positions_internal(&mut table, &READER, &update)
.expect("update reader position");
let segments =
ReaderGroupState::get_reader_positions_internal(&READER, table.get_inner_map(ASSIGNED))
.expect("get reader positions");
assert_eq!(segments.len(), 1, "reader should contain one owned segment");
assert_eq!(
segments.get(&SEGMENT_TEST).expect("get segment"),
&new_offset,
"the offset of owned segment should be updated"
);
let mut successor0 = SEGMENT_TEST.clone();
successor0.segment.number = 1;
let successor0_range = SegmentWithRange {
scoped_segment: successor0.clone(),
min_key: OrderedFloat(0.0),
max_key: OrderedFloat(0.5),
};
let mut successor1 = SEGMENT_TEST.clone();
successor1.segment.number = 2;
let successor1_range = SegmentWithRange {
scoped_segment: successor1.clone(),
min_key: OrderedFloat(0.5),
max_key: OrderedFloat(1.0),
};
let mut successors_mapped_to_their_predecessors = im::HashMap::new();
successors_mapped_to_their_predecessors.insert(
successor0_range,
vec![Segment {
number: 0,
tx_id: None,
}],
);
successors_mapped_to_their_predecessors.insert(
successor1_range,
vec![Segment {
number: 0,
tx_id: None,
}],
);
ReaderGroupState::segment_completed_internal(
&mut table,
&READER,
&SEGMENT_TEST,
&successors_mapped_to_their_predecessors,
)
.expect("reader segment completed");
assert!(table.contains_key(UNASSIGNED, &successor0.to_string()));
assert!(table.contains_key(UNASSIGNED, &successor1.to_string()));
ReaderGroupState::assign_segment_to_reader_internal(&mut table, &READER)
.expect("assign segment to reader");
ReaderGroupState::assign_segment_to_reader_internal(&mut table, &READER)
.expect("assign segment to reader");
let new_offset = Offset { read: 10 };
ReaderGroupState::release_segment_internal(&mut table, &READER, &successor0, &new_offset)
.expect("release segment");
let segments =
ReaderGroupState::get_reader_positions_internal(&READER, table.get_inner_map(ASSIGNED))
.expect("get reader positions");
assert_eq!(
segments.len(),
1,
"reader should contain 1 segment since the other one is released"
);
let mut segments = HashMap::new();
segments.insert(successor0.clone(), Offset { read: 0 });
segments.insert(successor1.clone(), Offset { read: 0 });
ReaderGroupState::remove_reader_internal(&mut table, &READER, &segments)
.expect("remove online reader");
let res = ReaderGroupState::remove_reader_internal(&mut table, &READER, &segments)
.expect_err("Excepted error");
match res {
SynchronizerError::SyncPreconditionError { .. } => {}
_ => assert!(false, "Invalid error returned"),
}
assert_eq!(table.get_inner_map(UNASSIGNED).len(), 2);
}
}