📄 wrepl_out_helpers.c
字号:
failed: talloc_free(c); return NULL;}static NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io){ NTSTATUS status; status = composite_wait(c); if (NT_STATUS_IS_OK(status)) { struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data, struct wreplsrv_pull_table_state); io->out.num_owners = state->table_io.out.num_partners; io->out.owners = talloc_reference(mem_ctx, state->table_io.out.partners); } talloc_free(c); return status; }struct wreplsrv_pull_names_io { struct { struct wreplsrv_partner *partner; struct wreplsrv_out_connection *wreplconn; struct wrepl_wins_owner owner; } in; struct { uint32_t num_names; struct wrepl_name *names; } out;};enum wreplsrv_pull_names_stage { WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION, WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY, WREPLSRV_PULL_NAMES_STAGE_DONE};struct wreplsrv_pull_names_state { enum wreplsrv_pull_names_stage stage; struct composite_context *c; struct wrepl_request *req; struct wrepl_pull_names pull_io; struct wreplsrv_pull_names_io *io; struct composite_context *creq; struct wreplsrv_out_connection *wreplconn;};static void wreplsrv_pull_names_handler_req(struct wrepl_request *req);static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state){ NTSTATUS status; status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn); NT_STATUS_NOT_OK_RETURN(status); state->pull_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx; state->pull_io.in.partner = state->io->in.owner; state->req = wrepl_pull_names_send(state->wreplconn->sock, &state->pull_io); NT_STATUS_HAVE_NO_MEMORY(state->req); state->req->async.fn = wreplsrv_pull_names_handler_req; state->req->async.private = state; state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY; return NT_STATUS_OK;}static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state){ NTSTATUS status; status = wrepl_pull_names_recv(state->req, state, &state->pull_io); NT_STATUS_NOT_OK_RETURN(status); state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE; return NT_STATUS_OK;}static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state){ struct composite_context *c = state->c; switch (state->stage) { case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION: c->status = wreplsrv_pull_names_wait_connection(state); break; case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY: c->status = wreplsrv_pull_names_wait_send_reply(state); c->state = COMPOSITE_STATE_DONE; break; case WREPLSRV_PULL_NAMES_STAGE_DONE: c->status = NT_STATUS_INTERNAL_ERROR; } if (!NT_STATUS_IS_OK(c->status)) { c->state = COMPOSITE_STATE_ERROR; } if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) { c->async.fn(c); }}static void wreplsrv_pull_names_handler_creq(struct composite_context *creq){ struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data, struct wreplsrv_pull_names_state); wreplsrv_pull_names_handler(state); return;}static void wreplsrv_pull_names_handler_req(struct wrepl_request *req){ struct wreplsrv_pull_names_state *state = talloc_get_type(req->async.private, struct wreplsrv_pull_names_state); wreplsrv_pull_names_handler(state); return;}static struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io){ struct composite_context *c = NULL; struct wreplsrv_service *service = io->in.partner->service; struct wreplsrv_pull_names_state *state = NULL; enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL; if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE; c = talloc_zero(mem_ctx, struct composite_context); if (!c) goto failed; state = talloc_zero(c, struct wreplsrv_pull_names_state); if (!state) goto failed; state->c = c; state->io = io; c->state = COMPOSITE_STATE_IN_PROGRESS; c->event_ctx = service->task->event_ctx; c->private_data = state; state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION; state->creq = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn); if (!state->creq) goto failed; state->creq->async.fn = wreplsrv_pull_names_handler_creq; state->creq->async.private_data = state; return c;failed: talloc_free(c); return NULL;}static NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io){ NTSTATUS status; status = composite_wait(c); if (NT_STATUS_IS_OK(status)) { struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data, struct wreplsrv_pull_names_state); io->out.num_names = state->pull_io.out.num_names; io->out.names = talloc_reference(mem_ctx, state->pull_io.out.names); } talloc_free(c); return status; }enum wreplsrv_pull_cycle_stage { WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY, WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES, WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC, WREPLSRV_PULL_CYCLE_STAGE_DONE};struct wreplsrv_pull_cycle_state { enum wreplsrv_pull_cycle_stage stage; struct composite_context *c; struct wreplsrv_pull_cycle_io *io; struct wreplsrv_pull_table_io table_io; uint32_t current; struct wreplsrv_pull_names_io names_io; struct composite_context *creq; struct wrepl_associate_stop assoc_stop_io; struct wrepl_request *req;};static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req);static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state){ struct wreplsrv_owner *current_owner=NULL; struct wreplsrv_owner *local_owner; uint32_t i; uint64_t old_max_version = 0; bool do_pull = false; for (i=state->current; i < state->table_io.out.num_owners; i++) { current_owner = wreplsrv_find_owner(state->io->in.partner->service, state->io->in.partner->pull.table, state->table_io.out.owners[i].address); local_owner = wreplsrv_find_owner(state->io->in.partner->service, state->io->in.partner->service->table, state->table_io.out.owners[i].address); /* * this means we are ourself the current owner, * and we don't want replicate ourself */ if (!current_owner) continue; /* * this means we don't have any records of this owner * so fetch them */ if (!local_owner) { do_pull = true; break; } /* * this means the remote partner has some new records of this owner * fetch them */ if (current_owner->owner.max_version > local_owner->owner.max_version) { do_pull = true; old_max_version = local_owner->owner.max_version; break; } } state->current = i; if (do_pull) { state->names_io.in.partner = state->io->in.partner; state->names_io.in.wreplconn = state->io->in.wreplconn; state->names_io.in.owner = current_owner->owner; state->names_io.in.owner.min_version = old_max_version + 1; state->creq = wreplsrv_pull_names_send(state, &state->names_io); NT_STATUS_HAVE_NO_MEMORY(state->creq); state->creq->async.fn = wreplsrv_pull_cycle_handler_creq; state->creq->async.private_data = state; return STATUS_MORE_ENTRIES; } return NT_STATUS_OK;}static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state){ NTSTATUS status; status = wreplsrv_pull_cycle_next_owner_do_work(state); if (NT_STATUS_IS_OK(status)) { state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE; } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) { state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES; status = NT_STATUS_OK; } if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) { state->assoc_stop_io.in.assoc_ctx = state->io->in.wreplconn->assoc_ctx.peer_ctx; state->assoc_stop_io.in.reason = 0; state->req = wrepl_associate_stop_send(state->io->in.wreplconn->sock, &state->assoc_stop_io); NT_STATUS_HAVE_NO_MEMORY(state->req); state->req->async.fn = wreplsrv_pull_cycle_handler_req; state->req->async.private = state; state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC; } return status;}static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state){ NTSTATUS status; uint32_t i; status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io); NT_STATUS_NOT_OK_RETURN(status); /* update partner table */ for (i=0; i < state->table_io.out.num_owners; i++) { status = wreplsrv_add_table(state->io->in.partner->service, state->io->in.partner, &state->io->in.partner->pull.table, state->table_io.out.owners[i].address, state->table_io.out.owners[i].max_version); NT_STATUS_NOT_OK_RETURN(status); } status = wreplsrv_pull_cycle_next_owner_wrapper(state); NT_STATUS_NOT_OK_RETURN(status); return status;}static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state){ NTSTATUS status; status = wreplsrv_apply_records(state->io->in.partner, &state->names_io.in.owner, state->names_io.out.num_names, state->names_io.out.names); NT_STATUS_NOT_OK_RETURN(status); talloc_free(state->names_io.out.names); ZERO_STRUCT(state->names_io); return NT_STATUS_OK;}static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state){ NTSTATUS status; status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io); NT_STATUS_NOT_OK_RETURN(status); /* * TODO: this should maybe an async call, * because we may need some network access * for conflict resolving */ status = wreplsrv_pull_cycle_apply_records(state); NT_STATUS_NOT_OK_RETURN(status); status = wreplsrv_pull_cycle_next_owner_wrapper(state); NT_STATUS_NOT_OK_RETURN(status); return status;}static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state){ NTSTATUS status; status = wrepl_associate_stop_recv(state->req, &state->assoc_stop_io); NT_STATUS_NOT_OK_RETURN(status); state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE; return status;}static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state){ struct composite_context *c = state->c; switch (state->stage) { case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -