use crate::client_factory::ClientFactoryAsync;
use crate::segment::event::{Incoming, RoutingInfo};
use crate::segment::writer::{Append, SegmentWriter};
use crate::util::get_random_f64;
use pravega_client_auth::DelegationTokenProvider;
use pravega_client_channel::ChannelSender;
use pravega_client_shared::{ScopedSegment, ScopedStream, StreamSegments, StreamSegmentsWithPredecessors};
use ahash::RandomState;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use tracing::{debug, warn};
pub(crate) struct SegmentSelector {
pub(crate) stream: ScopedStream,
pub(crate) writers: HashMap<ScopedSegment, SegmentWriter, RandomState>,
pub(crate) current_segments: StreamSegments,
pub(crate) sender: ChannelSender<Incoming>,
pub(crate) factory: ClientFactoryAsync,
pub(crate) delegation_token_provider: Arc<DelegationTokenProvider>,
}
impl SegmentSelector {
pub(crate) async fn new(
stream: ScopedStream,
sender: ChannelSender<Incoming>,
factory: ClientFactoryAsync,
) -> Self {
let delegation_token_provider = factory.create_delegation_token_provider(stream.clone()).await;
SegmentSelector {
stream,
writers: HashMap::default(),
current_segments: StreamSegments::new(BTreeMap::new()),
sender,
factory,
delegation_token_provider: Arc::new(delegation_token_provider),
}
}
pub(crate) async fn initialize(&mut self, stream_segments: Option<StreamSegments>) {
if let Some(ss) = stream_segments {
self.current_segments = ss;
} else {
self.current_segments = self
.factory
.controller_client()
.get_current_segments(&self.stream)
.await
.expect("retry failed");
}
self.create_missing_writers().await;
}
pub(crate) fn get_segment_writer(&mut self, routing_key: &Option<String>) -> &mut SegmentWriter {
let segment = self
.current_segments
.get_segment_for_routing_key(routing_key, get_random_f64);
self.writers
.get_mut(segment)
.expect("must have corresponding writer")
}
pub(crate) fn get_segment(&mut self, routing_key: &Option<String>) -> &ScopedSegment {
self.current_segments
.get_segment_for_routing_key(routing_key, get_random_f64)
}
pub(crate) fn get_segment_writer_by_key(&mut self, segment: &ScopedSegment) -> &mut SegmentWriter {
self.writers
.get_mut(segment)
.expect("must have corresponding writer")
}
pub(crate) async fn refresh_segment_event_writers_upon_sealed(
&mut self,
sealed_segment: &ScopedSegment,
) -> Option<Vec<Append>> {
let stream_segments_with_predecessors = self
.factory
.controller_client()
.get_successors(sealed_segment)
.await
.expect("get successors for sealed segment");
if stream_segments_with_predecessors.is_stream_sealed() {
None
} else {
Some(
self.update_segments_upon_sealed(stream_segments_with_predecessors, sealed_segment)
.await,
)
}
}
pub(crate) async fn update_segments_upon_sealed(
&mut self,
successors: StreamSegmentsWithPredecessors,
sealed_segment: &ScopedSegment,
) -> Vec<Append> {
self.current_segments = self
.current_segments
.apply_replacement_range(&sealed_segment.segment, &successors)
.expect("apply replacement range");
self.create_missing_writers().await;
self.writers
.get_mut(sealed_segment)
.expect("get writer")
.get_unacked_events()
}
#[allow(clippy::map_entry)] pub(crate) async fn create_missing_writers(&mut self) {
for scoped_segment in self.current_segments.get_segments() {
if !self.writers.contains_key(&scoped_segment) {
let mut writer = SegmentWriter::new(
scoped_segment.clone(),
self.sender.clone(),
self.factory.config().retry_policy,
self.delegation_token_provider.clone(),
);
debug!(
"writer {:?} created for segment {:?}",
writer.id,
scoped_segment.to_string()
);
if let Err(_e) = writer.setup_connection(&self.factory).await {
writer.reconnect(&self.factory).await;
}
self.writers.insert(scoped_segment, writer);
}
}
}
pub(crate) async fn resend(&mut self, to_resend: Vec<Append>) {
for append in to_resend {
let segment = match &append.event.routing_info {
RoutingInfo::RoutingKey(key) => self
.current_segments
.get_segment_for_routing_key(key, get_random_f64),
RoutingInfo::Segment(segment) => segment,
};
let segment_writer = self.writers.get_mut(segment).expect("must have writer");
segment_writer.add_pending(append.event, append.cap_guard);
if let Err(e) = segment_writer.write_pending_events().await {
warn!(
"failed to resend an event due to: {:?}, reconnecting the event segment writer",
e
);
segment_writer.reconnect(&self.factory).await;
}
}
}
pub(crate) fn remove_segment_writer(&mut self, segment: &ScopedSegment) -> Option<SegmentWriter> {
self.writers.remove(segment)
}
}
#[cfg(test)]
pub(crate) mod test {
use super::*;
use crate::client_factory::ClientFactory;
use im::HashMap as ImHashMap;
use ordered_float::OrderedFloat;
use pravega_client_channel::{create_channel, ChannelReceiver};
use pravega_client_config::connection_type::{ConnectionType, MockType};
use pravega_client_config::ClientConfigBuilder;
use pravega_client_shared::{
PravegaNodeUri, Retention, RetentionType, ScaleType, Scaling, Scope, Segment, SegmentWithRange,
StreamConfiguration,
};
use tokio::runtime::Runtime;
#[test]
fn test_segment_selector() {
let rt = Runtime::new().unwrap();
let (mut selector, _sender, _receiver, _factory) =
rt.block_on(create_segment_selector(MockType::Happy));
assert_eq!(selector.writers.len(), 2);
let mut sp = ImHashMap::new();
let successor1 = SegmentWithRange {
scoped_segment: ScopedSegment::from("testScope/testStream/2"),
min_key: OrderedFloat::from(0.0),
max_key: OrderedFloat::from(0.25),
};
let successor2 = SegmentWithRange {
scoped_segment: ScopedSegment::from("testScope/testStream/3"),
min_key: OrderedFloat::from(0.25),
max_key: OrderedFloat::from(0.5),
};
let pred = vec![Segment::from(0)];
sp.insert(successor1.clone(), pred.clone());
sp.insert(successor2.clone(), pred);
let replace = vec![successor1, successor2];
let mut rs = ImHashMap::new();
rs.insert(Segment::from(0), replace);
let ssp = StreamSegmentsWithPredecessors {
segment_with_predecessors: sp,
replacement_segments: rs,
};
let sealed_segment = ScopedSegment::from("testScope/testStream/0");
let events = rt.block_on(selector.update_segments_upon_sealed(ssp, &sealed_segment));
assert!(events.is_empty());
assert_eq!(selector.writers.len(), 4);
}
pub(crate) async fn create_segment_selector(
mock: MockType,
) -> (
SegmentSelector,
ChannelSender<Incoming>,
ChannelReceiver<Incoming>,
ClientFactory,
) {
let stream = ScopedStream::from("testScope/testStream");
let config = ClientConfigBuilder::default()
.connection_type(ConnectionType::Mock(mock))
.controller_uri(PravegaNodeUri::from("127.0.0.1:9091".to_string()))
.mock(true)
.build()
.unwrap();
let factory = ClientFactory::new(config);
factory
.controller_client()
.create_scope(&Scope {
name: "testScope".to_string(),
})
.await
.unwrap();
factory
.controller_client()
.create_stream(&StreamConfiguration {
scoped_stream: stream.clone(),
scaling: Scaling {
scale_type: ScaleType::FixedNumSegments,
target_rate: 1,
scale_factor: 1,
min_num_segments: 2,
},
retention: Retention {
retention_type: RetentionType::None,
retention_param: 0,
},
tags: None,
})
.await
.unwrap();
let (sender, receiver) = create_channel(1024);
let mut selector = SegmentSelector::new(stream.clone(), sender.clone(), factory.to_async()).await;
let stream_segments = factory
.controller_client()
.get_current_segments(&stream)
.await
.unwrap();
selector.initialize(Some(stream_segments)).await;
(selector, sender, receiver, factory)
}
}