use crate::client_factory::ClientFactoryAsync;
use crate::error::Error;
use crate::segment::event::{Incoming, PendingEvent, ServerReply, WriterInfo};
use crate::segment::raw_client::{RawClient, RawClientError};
use crate::update;
use crate::util::metric::ClientMetrics;
use crate::util::{current_span, get_random_u128, get_request_id};
use pravega_client_auth::DelegationTokenProvider;
use pravega_client_channel::{CapacityGuard, ChannelSender};
use pravega_client_retry::retry_async::retry_async;
use pravega_client_retry::retry_policy::RetryWithBackoff;
use pravega_client_retry::retry_result::{RetryError, RetryResult};
use pravega_client_shared::*;
use pravega_connection_pool::connection_pool::ConnectionPoolError;
use pravega_controller_client::ControllerError;
use pravega_wire_protocol::client_connection::*;
use pravega_wire_protocol::commands::{
AppendBlockEndCommand, ConditionalBlockEndCommand, SetupAppendCommand,
};
use pravega_wire_protocol::error::ClientConnectionError;
use pravega_wire_protocol::wire_commands::{Replies, Requests};
use snafu::{ResultExt, Snafu};
use std::collections::VecDeque;
use std::fmt;
use std::sync::Arc;
use tokio::select;
use tokio::sync::oneshot;
use tracing::{debug, error, field, info, info_span, trace};
use tracing_futures::Instrument;
pub(crate) struct SegmentWriter {
pub(crate) id: WriterId,
pub(crate) segment: ScopedSegment,
pub(crate) connection: Option<ClientConnectionWriteHalf>,
connection_listener_handle: Option<oneshot::Sender<bool>>,
inflight: VecDeque<Append>,
pending: VecDeque<Append>,
event_num: i64,
sender: ChannelSender<Incoming>,
retry_policy: RetryWithBackoff,
delegation_token_provider: Arc<DelegationTokenProvider>,
pub(crate) need_reset: bool,
}
impl SegmentWriter {
const MAX_WRITE_SIZE: i32 = 8 * 1024 * 1024 + 8;
const MAX_EVENTS: i32 = 500;
pub(crate) fn new(
segment: ScopedSegment,
sender: ChannelSender<Incoming>,
retry_policy: RetryWithBackoff,
delegation_token_provider: Arc<DelegationTokenProvider>,
) -> Self {
SegmentWriter {
id: WriterId::from(get_random_u128()),
connection: None,
segment,
inflight: VecDeque::new(),
pending: VecDeque::new(),
event_num: 0,
sender,
retry_policy,
delegation_token_provider,
connection_listener_handle: None,
need_reset: false,
}
}
pub(crate) async fn setup_connection(
&mut self,
factory: &ClientFactoryAsync,
) -> Result<(), SegmentWriterError> {
let span = info_span!("setup connection", segment_writer_id= %self.id, segment= %self.segment, host = field::Empty);
async {
info!("setting up connection for segment writer");
let (oneshot_tx, oneshot_rx) = oneshot::channel();
self.connection_listener_handle = Some(oneshot_tx);
let uri = match factory
.controller_client()
.get_endpoint_for_segment(&self.segment) .await
{
Ok(uri) => uri,
Err(e) => return Err(SegmentWriterError::RetryControllerWriting { err: e }),
};
current_span().record("host", &field::debug(&uri));
let request = Requests::SetupAppend(SetupAppendCommand {
request_id: get_request_id(),
writer_id: self.id.0,
segment: self.segment.to_string(),
delegation_token: self
.delegation_token_provider
.retrieve_token(factory.controller_client())
.await,
});
let raw_client = factory.create_raw_client_for_endpoint(uri);
let result = retry_async(self.retry_policy, || async {
debug!(
"setting up append for writer:{:?}/segment:{:?}",
self.id, self.segment
);
match raw_client.send_setup_request(&request).await {
Ok((reply, connection)) => RetryResult::Success((reply, connection)),
Err(e) => {
info!("failed to setup append using rawclient due to {:?}", e);
RetryResult::Retry(e)
}
}
})
.await;
let mut connection = match result {
Ok((reply, connection)) => match reply {
Replies::AppendSetup(cmd) => {
debug!(
"append setup completed for writer:{:?}/segment:{:?} with latest event number {}",
self.id, self.segment, cmd.last_event_number
);
self.ack(cmd.last_event_number);
connection
}
_ => {
info!("append setup failed due to {:?}", reply);
return Err(SegmentWriterError::WrongReply {
expected: String::from("AppendSetup"),
actual: reply,
});
}
},
Err(e) => return Err(SegmentWriterError::RetryRawClient { err: e }),
};
let (r, w) = connection.split();
self.connection = Some(w);
self.spawn_listener_task(r, oneshot_rx);
info!("finished setting up connection");
Ok(())
}
.instrument(span)
.await
}
fn spawn_listener_task(&self, mut r: ClientConnectionReadHalf, mut oneshot_rx: oneshot::Receiver<bool>) {
let segment = self.segment.clone();
let sender = self.sender.clone();
let listener_id = r.get_id();
let writer_id = self.id;
tokio::spawn(async move {
loop {
select! {
_ = &mut oneshot_rx => {
debug!("shut down connection listener {:?}", listener_id);
break;
}
result = r.read() => {
let reply = match result {
Ok(reply) => reply,
Err(e) => {
info!("connection failed to read data back from segmentstore due to {:?}, closing listener task {:?}", e, listener_id);
let result = sender
.send((Incoming::Reconnect(WriterInfo {
segment: segment.clone(),
connection_id: Some(r.get_id()),
writer_id,
}), 0)).await;
if let Err(e) = result {
error!("failed to send connectionFailure signal to reactor {:?}", e);
}
break;
}
};
let res = sender
.send((Incoming::ServerReply(ServerReply {
segment: segment.clone(),
reply,
}), 0))
.await;
if let Err(e) = res {
error!("connection read data from segmentstore but failed to send reply back to reactor due to {:?}",e);
break;
}
}
}
}
info!("listener task shut down {:?}", listener_id);
});
}
pub(crate) async fn write(
&mut self,
event: PendingEvent,
cap_guard: CapacityGuard,
) -> Result<(), SegmentWriterError> {
self.add_pending(event, cap_guard);
self.write_pending_events().await
}
pub(crate) fn add_pending(&mut self, event: PendingEvent, cap_guard: CapacityGuard) {
self.event_num += 1;
self.pending.push_back(Append {
event_id: self.event_num,
event,
cap_guard,
});
}
pub(crate) async fn write_pending_events(&mut self) -> Result<(), SegmentWriterError> {
if !self.inflight.is_empty() || self.pending.is_empty() {
return Ok(());
}
let mut total_size = 0;
let mut to_send = vec![];
let mut event_count = 0;
let conditional = self.pending.front().unwrap().event.conditional_offset.is_some();
let mut offset: i64 = -1;
while let Some(append) = self.pending.pop_front() {
assert!(
append.event.data.len() <= SegmentWriter::MAX_WRITE_SIZE as usize,
"event size {} must be under {}",
append.event.data.len(),
SegmentWriter::MAX_WRITE_SIZE
);
if append.event.data.len() + to_send.len() <= SegmentWriter::MAX_WRITE_SIZE as usize
&& event_count < SegmentWriter::MAX_EVENTS as usize
&& conditional == append.event.conditional_offset.is_some()
{
if conditional {
let event_offset = append.event.conditional_offset.as_ref().unwrap().to_owned();
if offset == -1 {
offset = event_offset + append.event.data.len() as i64;
} else if offset != event_offset {
self.pending.push_front(append);
break;
} else {
offset += append.event.data.len() as i64;
}
}
event_count += 1;
total_size += append.event.data.len();
to_send.extend(&append.event.data);
self.inflight.push_back(append);
} else {
self.pending.push_front(append);
break;
}
}
debug!(
"flushing {} events of total size {} to segment {:?} based on offset {:?}; event segment writer id {:?}/connection id: {:?}",
event_count,
to_send.len(),
self.segment.to_string(),
self.inflight.front().as_ref().unwrap().event.conditional_offset,
self.id,
self.connection.as_ref().expect("must have connection").get_id(),
);
let request = if let Some(offset) = self.inflight.front().unwrap().event.conditional_offset {
Requests::ConditionalBlockEnd(ConditionalBlockEndCommand {
writer_id: self.id.0,
event_number: self.inflight.back().expect("last event").event_id,
expected_offset: offset,
data: to_send,
request_id: get_request_id(),
})
} else {
Requests::AppendBlockEnd(AppendBlockEndCommand {
writer_id: self.id.0,
size_of_whole_events: total_size as i32,
data: to_send,
num_event: self.inflight.len() as i32,
last_event_number: self.inflight.back().expect("last event").event_id,
request_id: get_request_id(),
})
};
let writer = self.connection.as_mut().expect("must have connection");
writer.write(&request).await.context(SegmentWriting {})?;
update!(ClientMetrics::AppendBlockSize, total_size as u64, "Segment Writer Id" => self.id.to_string());
update!(
ClientMetrics::OutstandingAppendCount,
self.pending.len() as u64,
"Segment Writer Id" => self.id.to_string()
);
Ok(())
}
pub(crate) fn ack(&mut self, event_id: i64) {
if self.inflight.is_empty() {
return;
}
if event_id < 0 {
return;
}
loop {
let acked = self.inflight.front().expect("must not be empty");
if event_id < acked.event_id {
return;
}
let acked = self.inflight.pop_front().expect("must have");
if acked.event.oneshot_sender.send(Result::Ok(())).is_err() {
trace!(
"failed to send ack back to caller using oneshot due to Receiver dropped: event id {:?}",
acked.event_id
);
}
if let Some(flush_sender) = acked.event.flush_oneshot_sender {
if flush_sender.send(Result::Ok(())).is_err() {
info!(
"failed to send ack back to caller using oneshot due to Receiver dropped: event id {:?}",
acked.event_id
);
}
}
if acked.event_id == event_id {
break;
}
}
}
pub(crate) fn get_unacked_events(&mut self) -> Vec<Append> {
let mut ret = vec![];
while let Some(append) = self.inflight.pop_front() {
ret.push(append);
}
while let Some(append) = self.pending.pop_front() {
ret.push(append);
}
ret
}
pub(crate) async fn reconnect(&mut self, factory: &ClientFactoryAsync) {
debug!("Reconnecting segment writer {:?}", self.id);
let connection_id = self.connection.as_ref().map(|c| c.get_id());
let setup_res = self.setup_connection(factory).await;
if setup_res.is_err() {
self.sender
.send((
Incoming::Reconnect(WriterInfo {
segment: self.segment.clone(),
connection_id,
writer_id: self.id,
}),
0,
))
.await
.expect("send reconnect signal to reactor");
return;
}
while self.inflight.back().is_some() {
self.pending
.push_front(self.inflight.pop_back().expect("must have event"));
}
let flush_res = self.write_pending_events().await;
if flush_res.is_err() {
self.sender
.send((
Incoming::Reconnect(WriterInfo {
segment: self.segment.clone(),
connection_id,
writer_id: self.id,
}),
0,
))
.await
.expect("send reconnect signal to reactor");
}
}
pub(crate) fn get_pending_queue_size(&mut self) -> usize {
self.pending.len()
}
pub(crate) fn signal_delegation_token_expiry(&self) {
self.delegation_token_provider.signal_token_expiry()
}
pub(crate) fn fail_events_upon_conditional_check_failure(&mut self, event_id: i64) {
while let Some(append) = self.inflight.pop_back() {
if append.event_id >= event_id {
let _res = append
.event
.oneshot_sender
.send(Result::Err(Error::ConditionalCheckFailure {
msg: format!("Conditional check failed for event {}", append.event_id),
}));
} else {
self.inflight.push_back(append);
break;
}
}
while let Some(append) = self.pending.pop_back() {
let _res = append
.event
.oneshot_sender
.send(Result::Err(Error::ConditionalCheckFailure {
msg: format!(
"Conditional check failed in previous appends. Event id {}",
append.event_id
),
}));
}
}
}
impl fmt::Debug for SegmentWriter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SegmentWriter")
.field("segment writer id", &self.id)
.field("segment", &self.segment)
.field(
"connection",
&match &self.connection {
Some(w) => format!("WritingClientConnection is {:?}", w),
None => "doesn't have connection".to_owned(),
},
)
.field(
"inflight",
&format!("number of inflight events is {}", &self.inflight.len()),
)
.field(
"pending",
&format!("number of pending events is {}", &self.pending.len()),
)
.field("current event_number", &self.event_num)
.finish()
}
}
pub(crate) struct Append {
pub(crate) event_id: i64,
pub(crate) event: PendingEvent,
pub(crate) cap_guard: CapacityGuard,
}
#[derive(Debug, Snafu)]
pub enum SegmentWriterError {
#[snafu(display("Failed to send request to segmentstore due to: {:?}", source))]
SegmentWriting { source: ClientConnectionError },
#[snafu(display("Retry failed due to error: {:?}", err))]
RetryControllerWriting { err: RetryError<ControllerError> },
#[snafu(display("Retry connection pool failed due to error {:?}", err))]
RetryConnectionPool { err: RetryError<ConnectionPoolError> },
#[snafu(display("Retry raw client failed due to error {:?}", err))]
RetryRawClient { err: RetryError<RawClientError> },
#[snafu(display("Wrong reply, expected {:?} but get {:?}", expected, actual))]
WrongReply { expected: String, actual: Replies },
#[snafu(display("Wrong host: {:?}", error_msg))]
WrongHost { error_msg: String },
#[snafu(display("Reactor is closed due to: {:?}", msg))]
ReactorClosed { msg: String },
#[snafu(display("Conditional check failed: {}", msg))]
ConditionalCheckFailure { msg: String },
}
#[cfg(test)]
pub(crate) mod test {
use super::*;
use crate::client_factory::ClientFactory;
use crate::segment::event::RoutingInfo;
use pravega_client_channel::{create_channel, ChannelReceiver};
use pravega_client_config::connection_type::{ConnectionType, MockType};
use pravega_client_config::ClientConfigBuilder;
use tokio::sync::oneshot;
type EventHandle = oneshot::Receiver<Result<(), Error>>;
#[test]
fn test_segment_writer_happy_write() {
let rt = tokio::runtime::Runtime::new().unwrap();
let (mut segment_writer, mut sender, mut receiver, factory) = create_segment_writer(MockType::Happy);
let result = rt.block_on(segment_writer.setup_connection(&factory.to_async()));
assert!(result.is_ok());
let (event, guard, event_handle1) = rt.block_on(create_event(512, &mut sender, &mut receiver, None));
let (reply, cap_guard) = rt
.block_on(async {
segment_writer.write(event, guard).await.expect("write data");
return receiver.recv().await;
})
.expect("receive DataAppend from segment writer");
assert_eq!(cap_guard.size, 0);
assert_eq!(segment_writer.event_num, 1);
assert_eq!(segment_writer.inflight.len(), 1);
assert!(segment_writer.pending.is_empty());
let (event, guard, event_handle2) = rt.block_on(create_event(512, &mut sender, &mut receiver, None));
rt.block_on(segment_writer.write(event, guard))
.expect("write data");
assert_eq!(segment_writer.event_num, 2);
assert_eq!(segment_writer.inflight.len(), 1);
assert_eq!(segment_writer.pending.len(), 1);
ack_server_reply(reply, &mut segment_writer);
rt.block_on(segment_writer.write_pending_events())
.expect("write data");
let (reply, cap_guard) = rt
.block_on(receiver.recv())
.expect("receive DataAppend from segment writer");
ack_server_reply(reply, &mut segment_writer);
assert_eq!(cap_guard.size, 0);
assert!(segment_writer.inflight.is_empty());
assert!(segment_writer.pending.is_empty());
let caller_reply = rt.block_on(event_handle1).expect("caller receive reply");
assert!(caller_reply.is_ok());
let caller_reply = rt.block_on(event_handle2).expect("caller receive reply");
assert!(caller_reply.is_ok());
}
#[test]
fn test_segment_writer_reply_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let (mut segment_writer, mut sender, mut receiver, factory) =
create_segment_writer(MockType::SegmentIsSealed);
let result = rt.block_on(segment_writer.setup_connection(&factory.to_async()));
assert!(result.is_ok());
let (event, guard, _event_handle) = rt.block_on(create_event(512, &mut sender, &mut receiver, None));
let (reply, cap_guard) = rt
.block_on(async {
segment_writer.write(event, guard).await.expect("write data");
receiver.recv().await
})
.expect("receive DataAppend from segment writer");
assert_eq!(cap_guard.size, 0);
let result = if let Incoming::ServerReply(server) = reply {
if let Replies::SegmentIsSealed(_cmd) = server.reply {
true
} else {
false
}
} else {
false
};
assert!(result);
}
#[test]
fn test_segment_writer_mixed_writes() {
let rt = tokio::runtime::Runtime::new().unwrap();
let (mut segment_writer, mut sender, mut receiver, factory) = create_segment_writer(MockType::Happy);
let result = rt.block_on(segment_writer.setup_connection(&factory.to_async()));
assert!(result.is_ok());
let (event0, guard0, _event_handle) =
rt.block_on(create_event(128, &mut sender, &mut receiver, Some(0)));
let (event1, guard1, _event_handle) =
rt.block_on(create_event(128, &mut sender, &mut receiver, None));
segment_writer.add_pending(event0, guard0);
segment_writer.add_pending(event1, guard1);
rt.block_on(segment_writer.write_pending_events()).expect("write");
assert_eq!(segment_writer.pending.len(), 1);
assert_eq!(segment_writer.inflight.len(), 1);
segment_writer.inflight.clear();
segment_writer.pending.clear();
let (event0, guard0, _event_handle) =
rt.block_on(create_event(128, &mut sender, &mut receiver, Some(0)));
let (event1, guard1, _event_handle) =
rt.block_on(create_event(128, &mut sender, &mut receiver, Some(128)));
segment_writer.add_pending(event0, guard0);
segment_writer.add_pending(event1, guard1);
rt.block_on(segment_writer.write_pending_events()).expect("write");
assert_eq!(segment_writer.pending.len(), 0);
assert_eq!(segment_writer.inflight.len(), 2);
segment_writer.inflight.clear();
segment_writer.pending.clear();
let (event0, guard0, _event_handle) =
rt.block_on(create_event(128, &mut sender, &mut receiver, Some(0)));
let (event1, guard1, _event_handle) =
rt.block_on(create_event(128, &mut sender, &mut receiver, Some(256)));
segment_writer.add_pending(event0, guard0);
segment_writer.add_pending(event1, guard1);
rt.block_on(segment_writer.write_pending_events()).expect("write");
assert_eq!(segment_writer.pending.len(), 1);
assert_eq!(segment_writer.inflight.len(), 1);
}
#[test]
fn test_segment_writer_conditional_append_failure() {
let rt = tokio::runtime::Runtime::new().unwrap();
let (mut segment_writer, mut sender, mut receiver, factory) = create_segment_writer(MockType::Happy);
let result = rt.block_on(segment_writer.setup_connection(&factory.to_async()));
assert!(result.is_ok());
let (event0, guard0, _event_handle) =
rt.block_on(create_event(128, &mut sender, &mut receiver, Some(1)));
let (event1, guard1, _event_handle) =
rt.block_on(create_event(128, &mut sender, &mut receiver, Some(129)));
let (event2, guard2, _event_handle) =
rt.block_on(create_event(128, &mut sender, &mut receiver, Some(257)));
segment_writer.add_pending(event0, guard0);
segment_writer.add_pending(event1, guard1);
segment_writer.add_pending(event2, guard2);
rt.block_on(segment_writer.write_pending_events()).expect("write");
assert_eq!(segment_writer.pending.len(), 0);
assert_eq!(segment_writer.inflight.len(), 3);
segment_writer.fail_events_upon_conditional_check_failure(1);
assert_eq!(segment_writer.pending.len(), 0);
assert_eq!(segment_writer.inflight.len(), 0);
let (event0, guard0, _event_handle) =
rt.block_on(create_event(128, &mut sender, &mut receiver, Some(1)));
let (event1, guard1, _event_handle) =
rt.block_on(create_event(128, &mut sender, &mut receiver, Some(129)));
let (event2, guard2, _event_handle) =
rt.block_on(create_event(128, &mut sender, &mut receiver, Some(257)));
segment_writer.add_pending(event0, guard0);
segment_writer.add_pending(event1, guard1);
rt.block_on(segment_writer.write_pending_events()).expect("write");
segment_writer.add_pending(event2, guard2);
assert_eq!(segment_writer.pending.len(), 1);
assert_eq!(segment_writer.inflight.len(), 2);
segment_writer.fail_events_upon_conditional_check_failure(1);
assert_eq!(segment_writer.pending.len(), 0);
assert_eq!(segment_writer.inflight.len(), 0);
}
pub(crate) fn create_segment_writer(
mock: MockType,
) -> (
SegmentWriter,
ChannelSender<Incoming>,
ChannelReceiver<Incoming>,
ClientFactory,
) {
let segment = ScopedSegment::from("testScope/testStream/0");
let config = ClientConfigBuilder::default()
.connection_type(ConnectionType::Mock(mock))
.controller_uri(PravegaNodeUri::from("127.0.0.1:9091".to_string()))
.mock(true)
.build()
.unwrap();
let factory = ClientFactory::new(config);
let (sender, receiver) = create_channel(1024);
let delegation_token_provider = DelegationTokenProvider::new(ScopedStream::from(&segment));
(
SegmentWriter::new(
segment,
sender.clone(),
factory.config().retry_policy,
Arc::new(delegation_token_provider),
),
sender,
receiver,
factory,
)
}
async fn create_event(
size: usize,
sender: &mut ChannelSender<Incoming>,
receiver: &mut ChannelReceiver<Incoming>,
offset: Option<i64>,
) -> (PendingEvent, CapacityGuard, EventHandle) {
let (oneshot_sender, oneshot_receiver) = tokio::sync::oneshot::channel();
let event = PendingEvent::new(
RoutingInfo::RoutingKey(Some("routing_key".into())),
vec![1; size],
offset,
oneshot_sender,
None,
)
.expect("create pending event");
sender.send((Incoming::AppendEvent(event), size)).await.unwrap();
loop {
let (event, guard) = receiver.recv().await.unwrap();
if let Incoming::AppendEvent(e) = event {
return (e, guard, oneshot_receiver);
}
}
}
fn ack_server_reply(reply: Incoming, writer: &mut SegmentWriter) {
let result = if let Incoming::ServerReply(server) = reply {
if let Replies::DataAppended(cmd) = server.reply {
writer.ack(cmd.event_number);
true
} else {
false
}
} else {
false
};
assert!(result);
}
}