Hey guys. I'll try to keep this as short as possible.
Was testing out some thresholds with a simple service broker application and ran into something interesting that I'm trying to find a workaround to. I setup a simple service on a single instance, and am transmitting messages to/from the same instance. I assigned a procedure to be activated on the queue, with a max of 20 parallel activations.
The procedure originally was reading messages off the queue by following a process with major steps outlined here:
- Start loop until no more messages to process Call get conv. group with 3000 timeout into variable x If group retrieved, receive top 1 from queue where conv_group = x
- If no group, exit
So, with this type of configuration, I sent off a few thousand messages to the queue and monitored the queue count, user table count, and activated tasks count. To my surprise, despite the fact that there were thousands of messages in the queue waiting to be processed, only a single task was processing data, pulling about 1 per second.
I tried over and over with different values in the waitfor, and no additional tasks got spun up until I had a waitfor of 6 seconds or more...then only about 3-4 would get spun up at any time in parallel.
Given the way activation occurs, I thought I'd try things without the call to get conv group, and instead simply use a receive without a where clause...so, I rewrote the activation procedure to skip the call to GET CONV GROUP, and simply perform a receive without any where clause. I also removed the waitfor delay and simply left it to process messages as fast as it could...
This time, I sent over the same number of messages and checked the same counts, this time noticing that within 20 seconds or so, all 20 parallel instances of the activation procedure were processing messages off the queue, and doing so very quickly. Same occured if I put the waitfor delay back in...
The problem is, that this type of coding doesn't allow for processing related messages, even though it seems to be much better at allowing parallel activations. I tried to rewrite again using 2 types of receive statements (first one without a where clause and next one with a where clause on the conv. group from the first receive after processing the message) to see if that would allow for better parallel activation, however that worked about the same as using the GET CONV GROUP call.
So, if I have an application that mostly does not make use of grouping and want to ensure optimal parallel activation when needed, however also handle the times when related messages arrive and need to be processed together, what can I do? Any ideas?
I have test code scripts that can reproduce this behavior if needed. Thanks up front,
Can you send your scripts to Rushi (dot) Desai (at) microsoft (dot) com so that I can take a look at what is going on? Activation should not start tasks more frequently than once every 5 seconds whether you use GET CONVERSATION GROUP or RECEIVE directly. Given that, starting 20 concurrent queue readers should take 100 or more seconds.|||Hi Rushi...you're correct, it appears that the tasks are getting started about 1 every 5 seconds at best. Not sure how I got the original 20 seconds I mentioned above, but I can't seem to reproduce that no matter what I do...must have been a brain mix-up on my part, sorry.
I can still forward you the scripts, but I'm mostly curious as if there's anything that can be done to 'force' the same type of activation when using the GET CONV GROUP call, then a RECEIVE with a filter on the given group? If my queue is getting filled with about 50 messages a second, 99.9% of which will contain unique conversation_group values, and each message takes about 1/2 second to process, it seems that the best I can get for parallel activation is about 3, and that isn't coming close to handling staying in front of the arrival rate...
Thanks,
|||You should not notice any difference in the behavior of activation if you do:
WAITFOR (GET CONVERSATION GROUP ...)
... something that takes less than a second or two to complete ...
RECEIVE ... WHERE conversation_group_id = ...
... process message for long time...
and
WAITFOR (RECEIVE ... )
... process message for long time...
50 msgs on unique conversation groups with about 1/2 second per message would perform as follows (roughly):
t = 0: Start first task
5 < t < 10: Consumed 10 messages, start second task
10 < t < 15: Consumed 30 messages; start third task
15 < t < 20: Consumed 50 messages
Unfortunately, in SQL Server 2005, the 5 second timeout cannot be adjusted by the user and therefore user cannot adjust the "responsiveness" of activation. There is a tradeoff to be made between how quickly activation should launch additional task and how much time it should let existing tasks take to drain the queue. In future releases it would be nice to allow the user to control responsiveness. I would recommend that you write to product feedback (http://lab.msdn.microsoft.com/productfeedback/) so that we can open an issue to resolve this in the next release.
Current workaround may be using multiple identical services (and queues) and partitioning your dialogs among those services. Each of its queues will be activated independently and you could get more number of tasks started. (We know at least one internal customer who is doing this).
Thanks,
Rushi
Hey again Rushi.
Well, I notice a HUGE difference in activation between those 2 exact scenarios. Using a GET CONV GROUP followed by a RECEIVE with a where clause, I can't get more than a single activation. However, using just a RECEIVE without a where clause and not call to GET CONV GROUP, I get max activations, 1 new activation every 5 seconds or so.
I'm not so concerned about the 5 second activation threshold, as that would be plenty fast enough an activation under load, and once the tasks are activated, they remain until they exit (i.e. queue is empty).
Since I can't attach scripts, I'll post another message immediately following this one with the full-text of the scripts that can easily reproduce this different behavior. To use the scripts:
1) SCRIPT 1 - will create the sample db, broker objects, endpoint, queue, service, etc., including the activation procedure (cp_test1). Notice in the intial creation, the procedure will make use of method #1 from above (i.e. a call to GET CONV GROUP, followed by a RECEIVE with where clause), and simply pulls top 1 from the queue w/ group filter, then inserts into another logging table, then pauses for 1/2 second before continuing.
2) SCRIPT 2 - will simply send @.c number of messages (2000 on post) into the queue, pausing for @.delay (2/100ths of a second on post) time between each send onto the queue
Run SCRIPT 1, then setup another script to simply the count of activated tasks, then start SCRIPT 2. You should notice that despite the fact that messages are arriving much faster than being processed, only a single task is activated no matter how long you wait.
3) SCRIPT 3 - will alter the cp_test1 procedure to use method #2 from above (i.e. no call to GET CONV GROUP, but instead just a call to RECEIVE without where clause). Run this to modify the proc, then start the tests over after all activated tasks have either exited or been killed.
After running script 3, you'll notice that tasks get activated every 5 seconds up to max activations...
|||--
SCRIPT 1
--
/*
use master
drop database sbTest
drop endpoint ep_ServiceBroker_tcp
*/
execute as login = 'sa'
go
create database sbTest
go
use sbTest
go
create master key encryption by password = 'jpaiweqf2q938hq2-3980nhf9piunfwpojf';
alter master key add encryption by service master key;
go
create endpoint ep_ServiceBroker_tcp
state = stopped
as tcp (listener_port = 4022, listener_ip = all)
for service_broker (authentication = windows, encryption = disabled,
message_forwarding = disabled);
go
alter database sbTest set enable_broker;
alter database sbTest set error_broker_conversations;
go
create message type mt_test1 validation = well_formed_xml;
go
create contract ct_test1 (mt_test1 sent by initiator);
go
create procedure dbo.cp_test1
as
declare @.conversation_group_id uniqueidentifier,
@.conversation_handle uniqueidentifier,
@.msg_type_name nvarchar(128)
declare @.messages table (status tinyint, priority int, queing_order bigint, conversation_group_id uniqueidentifier,
conversation_handle uniqueidentifier, message_sequence_number bigint, service_name nvarchar(512),
service_id int, service_contract_name nvarchar(256), service_contract_id int, message_type_name nvarchar(256),
message_type_id int, validation nchar(2), message_body varbinary(max))
-- Loop until we manually break
while 1 = 1 begin
-- Start a transaction to receive the appropriate group
begin transaction;
-- Get the next available conversation group...wait for a few seconds then loop...
waitfor (
get conversation group @.conversation_group_id from dbo.qu_test1),
timeout 3000;
-- If we didn't get anything, break since we're all done and will be re-awoken automatically if needed
if @.conversation_group_id is null begin
if @.@.trancount > 0
rollback transaction;
break;
end
-- Got a group, so process all the messages for said group...
while 1 = 1 begin
-- Receive the next message for the conversation group...notice the where clause in the
-- receive statement, which ensures we only get messages for this group...no need for a waitfor here,
-- as calling the get conversation group will lock that group (so noone else gets any messages for it), and
-- we wouldn't even get to this point if there wasn't a message for this group...
receive top (1) *
from dbo.qu_test1
into @.messages
where conversation_group_id = @.conversation_group_id;
if @.@.rowcount = 0 or @.@.error <> 0
-- If an error occured, there is probably no more messages, or something happened in the receive
break;
-- Get the information that we need to process the message
select @.conversation_handle = conversation_handle,
@.msg_type_name = message_type_name
from @.messages
-- If an error occured (on the other end), or the message is an end conversation message,
-- end the conversation from our end
if @.msg_type_name in('http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog',
'http://schemas.microsoft.com/SQL/ServiceBroker/Error') begin
end conversation @.conversation_handle;
end
-- Stick the message data into our logging table
begin try
insert dbo.zztemp_messages
select status, priority, queing_order, conversation_group_id, conversation_handle,
message_sequence_number, service_name, service_id, service_contract_name,
service_contract_id, message_type_name, message_type_id, validation, message_body, getdate()
from @.messages;
-- Simulate think time for doing some work for a half a second or so
waitfor delay '000:00:00.5'
end try
begin catch
select error_message() as err_msg, error_number() as err_number,
error_line() as err_line, error_procedure() as err_proc;
print 'Caught error ' + quotename(error_message());
rollback transaction;
break;
end catch
delete @.messages
end -- inner - while 1 = 1
-- All done processing this conversation group, commit now...
commit transaction
end -- while 1 = 1
go
if isnull(object_id('dbo.zztemp_messages'), 0) = 0
create table dbo.zztemp_messages (status tinyint, priority int, queing_order bigint, conversation_group_id uniqueidentifier,
conversation_handle uniqueidentifier, message_sequence_number bigint, service_name nvarchar(512),
service_id int, service_contract_name nvarchar(256), service_contract_id int, message_type_name nvarchar(256),
message_type_id int, validation nchar(2), message_body varbinary(max), dt datetime default getdate());
go
-- Create the queue for the source server
create queue dbo.qu_test1 with
status = on,
retention = off,
activation (
status = on,
procedure_name = dbo.cp_test1,
max_queue_readers = 20,
execute as 'dbo'
)
on [default];
go
-- Create the source service
create service sv_test1
on queue dbo.qu_test1
(ct_test1);
go
alter endpoint ep_ServiceBroker_tcp state = started
go
--
SCRIPT 2
--
use sbTest
go
declare @.i int, @.c int, @.delay varchar(25)
select @.i = 0, -- Incremental count...
@.c = 2000, -- Set this to the number of messages to send to the queue
@.delay = '000:00:00.02' -- waitfor delay value to pause on each iteration before sending next message
while @.i <= @.c begin
-- Send a test message to the target service
declare @.msg xml, @.hConversation uniqueidentifier;
set @.msg = N'<message>Test Message to Target</message>';
begin try
begin transaction;
-- Start the conversation
begin dialog conversation @.hConversation
from service sv_test1
to service 'sv_test1'
on contract ct_test1
with encryption = off;
-- Send the message on the dialog
send on conversation @.hConversation
message type mt_test1
(@.msg);
-- Commit the send...
commit transaction;
-- Not going to code for receiving back, as the auto-activated stored procedure handles that
-- NOTE that it also handles ending the conversation that was begun here...
end try
begin catch
-- Show the error information
select error_number() as err_num, error_message() as err_msg,
error_line() as err_line, error_procedure() as err_proc;
if @.hConversation is not null
end conversation @.hConversation
if @.@.trancount > 0
rollback transaction;
end catch
if @.@.trancount > 0
commit transaction;
set @.i = @.i + 1
waitfor delay @.delay
end
--
SCRIPT 3
--
use sbTest
go
alter procedure dbo.cp_test1
as
declare @.conversation_group_id uniqueidentifier,
@.conversation_handle uniqueidentifier,
@.msg_type_name nvarchar(128),
@.error int
declare @.messages table (status tinyint, priority int, queing_order bigint, conversation_group_id uniqueidentifier,
conversation_handle uniqueidentifier, message_sequence_number bigint, service_name nvarchar(512),
service_id int, service_contract_name nvarchar(256), service_contract_id int, message_type_name nvarchar(256),
message_type_id int, validation nchar(2), message_body varbinary(max))
-- Got a group, so process all the messages for said group...
while 1 = 1 begin
begin transaction;
-- Notice not performing a call to GET CONVERSATION GROUP here...
waitfor (
receive top (1) *
from dbo.qu_test1
into @.messages), timeout 3000;
if @.@.error <> 0 begin
rollback tran;
break;
end
if (select count(*) from @.messages) = 0 begin
rollback tran;
break;
end
-- Get the information that we need to process the message
select @.conversation_handle = conversation_handle,
@.msg_type_name = message_type_name,
@.conversation_group_id = conversation_group_id
from @.messages
-- If an error occured (on the other end), or the message is an end conversation message,
-- end the conversation from our end
if @.msg_type_name in('http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog',
'http://schemas.microsoft.com/SQL/ServiceBroker/Error') begin
end conversation @.conversation_handle;
end
-- Stick the message data into our logging table
begin try
insert dbo.zztemp_messages
select status, priority, queing_order, conversation_group_id, conversation_handle,
message_sequence_number, service_name, service_id, service_contract_name,
service_contract_id, message_type_name, message_type_id, validation, message_body, getdate()
from @.messages;
-- Simulate think time for doing some work for a half a second or so
waitfor delay '000:00:00.5'
end try
begin catch
select error_message() as err_msg, error_number() as err_number,
error_line() as err_line, error_procedure() as err_proc;
print 'Caught error ' + quotename(error_message());
rollback transaction;
break;
end catch
-- All done processing this conversation group, commit now...
commit transaction;
delete @.messages
end -- while 1 = 1
go
Yes, you can.
The problem is the inner loop that iterates over messages for a given group until RECEIVE returns empty result set. This kind of processing fools the activation machinery to believe that the activated proc is keeping up with the incomming rate of messages, because of the empty result set.
The solution is to receive ALL available messages for a group (no TOP clause), and then process the resultset, using a cursor:
use sbTest
go
alter procedure dbo.cp_test1
as
declare @.conversation_group_iduniqueidentifier,
@.conversation_handleuniqueidentifier,
@.msg_type_namenvarchar(128),
@.queuing_order INT;
declare @.messages table (status tinyint, priority int, queing_order bigint, conversation_group_id uniqueidentifier,
conversation_handle uniqueidentifier, message_sequence_number bigint, service_name nvarchar(512),
service_id int, service_contract_name nvarchar(256), service_contract_id int, message_type_name nvarchar(256),
message_type_id int, validation nchar(2), message_body varbinary(max))
-- declare a cursor for the @.messages
DECLARE crsMessages CURSOR
forward_only read_only
for
SELECT conversation_handle,
message_type_name,
queing_order
FROM @.messages;
-- Loop until we manually break
while 1 = 1 begin
-- Start a transaction to receive the appropriate group
begin transaction;
-- Get the next available conversation group...wait for a few seconds then loop...
waitfor (
get conversation group @.conversation_group_id from dbo.qu_test1),
timeout 3000;
-- If we didn't get anything, break since we're all done and will be re-awoken automatically if needed
if @.conversation_group_id is null begin
if @.@.trancount > 0
rollback transaction;
break;
end;
-- Got a group, so process all the messages for said group...
-- Receive the next message for the conversation group...notice the where clause in the
-- receive statement, which ensures we only get messages for this group...no need for a waitfor here,
-- as calling the get conversation group will lock that group (so noone else gets any messages for it), and
-- we wouldn't even get to this point if there wasn't a message for this group...
receive *
from dbo.qu_test1
into @.messages
where conversation_group_id = @.conversation_group_id;
if @.@.rowcount = 0 or @.@.error <> 0
-- If an error occured, there is probably no more messages, or something happened in the receive
break;
-- open the cursor over @.messages
OPEN crsMessages;
FETCH NEXT FROM crsMessages
INTO @.conversation_handle, @.msg_type_name, @.queuing_order;
WHILE (@.@.FETCH_STATUS = 0)
BEGIN
-- If an error occured (on the other end), or the message is an end conversation message,
-- end the conversation from our end
if @.msg_type_name in('http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog',
'http://schemas.microsoft.com/SQL/ServiceBroker/Error') begin
end conversation @.conversation_handle;
end
-- Stick the message data into our logging table
begin try
insert dbo.zztemp_messages
select status, priority, queing_order, conversation_group_id, conversation_handle,
message_sequence_number, service_name, service_id, service_contract_name,
service_contract_id, message_type_name, message_type_id, validation, message_body, getdate()
from @.messages
WHERE queing_order = @.queuing_order;
-- Simulate think time for doing some work for a half a second or so
waitfor delay '000:00:00.5'
end try
begin catch
select error_message() as err_msg, error_number() as err_number,
error_line() as err_line, error_procedure() as err_proc;
print 'Caught error ' + quotename(error_message());
rollback transaction;
break;
end catch
FETCH NEXT FROM crsMessages
INTO @.conversation_handle, @.msg_type_name, @.queuing_order;
END
CLOSE crsMessages;
delete @.messages
-- All done processing this conversation group, commit now...
commit transaction
end
DEALLOCATE crsMessages
go
HTH,
~ Remus
No comments:
Post a Comment