Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handshake issue when getting replayid from async call to database #36

Open
kathariyasunny16 opened this issue Aug 18, 2023 · 8 comments

Comments

@kathariyasunny16
Copy link

kathariyasunny16 commented Aug 18, 2023

I am trying to get the replayid from the cosmos database before creating a Bayeux client object. I am not able to handshake when after bayeuxclient.Handshake(). The same code is working fine when I am not using the cosmos db call. Please find below code snippet.

var authResponse = await _authenticateConnectedApp.Authenticate().ConfigureAwait(false);
            if (authResponse.AccessToken is not null)
            {
                var (channel, apiVersion) = Config.GetSaleForceAccountChannelDetails();
                //DB call
                var lastUsedReplayId = await _operatingParametersRepository.GetSaleForceLastUsedReplayId(channel).ConfigureAwait(false);
               
                var endpointUrl = $"{authResponse.InstanceUrl}/cometd/{apiVersion}";
                var readTimeOut = 120 * 1000;

                var transportOptions = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
                {
                    {ClientTransport.TIMEOUT_OPTION, readTimeOut},
                    {ClientTransport.MAX_NETWORK_DELAY_OPTION, readTimeOut}
                };
                var headers = new NameValueCollection
                    {{HttpRequestHeader.Authorization.ToString(), $"Bearer {authResponse.AccessToken}"}};

                var transport = new LongPollingTransport(transportOptions, headers);
                var transports = new ClientTransport[]
                {
                    transport
                };

                // Create a CometD client instance
                var bayeuxClient = new BayeuxClient(endpointUrl, transports);
                bayeuxClient.Handshake();
                bayeuxClient.WaitFor(100000, new List<BayeuxClient.State> {BayeuxClient.State.CONNECTED});
                log.LogInformation("Handshake Status : {Status}", bayeuxClient.Handshook);
                bayeuxClient.GetChannel(channel, lastUsedReplayId).Subscribe(new SalesForceMessageListener());
                log.LogInformation("Connection Status : {Status}. Now, Waiting for the event", bayeuxClient.Connected);
            }

// CosmosHelperRepository Code Called from above function

public async Task<long> GetSaleForceLastUsedReplayId(string channel)
    {
        try
        {
            var dbExistingRecordList = await _cosmosHelper.GetQuery(x =>
                x.Pk == nameof(SalesForceReplayIdRunDetail)
                && x.ChannelName == channel);
            var lastUsedReplayId = dbExistingRecordList.FirstOrDefault()?.LastUsedReplayId ?? -1;
            return lastUsedReplayId;
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
            throw;
        }
        
    }

// Implementation of Cosmos helper GetQuery method called in above repository function

public async Task<IList<T>> GetQuery(Expression<Func<T, bool>> predicate)
    {
        var container = GetContainer();
        var query = container.GetItemLinqQueryable<T>()
            .Where(predicate);
        var list = new List<T>();
        using var feedIterator = _feedIteratorProvider.GetFeedIterator(query);
        while (feedIterator.HasMoreResults) list.AddRange(await feedIterator.ReadNextAsync());

        return list;
    }

Just an FYI, I am able to get the replayId from cosmos db using the above code but the handshake is not happening.

@kathariyasunny16
Copy link
Author

kathariyasunny16 commented Aug 18, 2023

@kdcllc please have a look. I am trying to resolve this issue for the last 2 days. Any help will be much appreciated.

Thank you so much for your response @kdcllc . I have updated the code in the issue and sharing here as well for your reference.

var authResponse = await _authenticateConnectedApp.Authenticate().ConfigureAwait(false);
if (authResponse.AccessToken is not null)
{
    var (channel, apiVersion) = Config.GetSaleForceAccountChannelDetails();
    
    //DB call
    var lastUsedReplayId = await _operatingParametersRepository.GetSaleForceLastUsedReplayId(channel).ConfigureAwait(false);

    var endpointUrl = $"{authResponse.InstanceUrl}/cometd/{apiVersion}";
    var readTimeOut = 120 * 1000;

    var transportOptions = new Dictionary<string,object>(StringComparer.OrdinalIgnoreCase)
    {
        {ClientTransport.TIMEOUT_OPTION, readTimeOut},
        {ClientTransport.MAX_NETWORK_DELAY_OPTION, readTimeOut}
    };

    var headers = new NameValueCollection{
        {HttpRequestHeader.Authorization.ToString(),
        $"Bearer {authResponse.AccessToken}"}};

    var transport = new LongPollingTransport(transportOptions, headers);
    var transports = new ClientTransport[]
    {
        transport
    };

    // Create a CometD client instance
    var bayeuxClient = new BayeuxClient(endpointUrl, transports);
    bayeuxClient.Handshake();
    bayeuxClient.WaitFor(100000, new List<BayeuxClient.State> {BayeuxClient.State.CONNECTED});
    log.LogInformation("Handshake Status : {Status}", bayeuxClient.Handshook);

    bayeuxClient.GetChannel(channel,lastUsedReplayId).Subscribe(new SalesForceMessageListener());
    log.LogInformation("Connection Status : {Status}. Now, Waiting for the event", bayeuxClient.Connected);
}
// CosmosHelperRepository Code Called from above function
public async Task<long> GetSaleForceLastUsedReplayId(string channel)
{
    try
    {
        var dbExistingRecordList = await _cosmosHelper.GetQuery(x=>  x.Pk == nameof(SalesForceReplayIdRunDetail) && x.ChannelName == channel);
        var lastUsedReplayId = dbExistingRecordList.FirstOrDefault()?.LastUsedReplayId ?? -1;
        return lastUsedReplayId;
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
        throw;
    }

}
// Implementation of Cosmos helper GetQuery method called in above
// repository function
public async Task<IList<T>> GetQuery(Expression<Func<T, bool>> predicate)
{
    var container = GetContainer();
    var query = container.GetItemLinqQueryable<T>()
        .Where(predicate);
    var list = new List<T>();
    using var feedIterator = _feedIteratorProvider.GetFeedIterator(query);
    while (feedIterator.HasMoreResults) list.AddRange(await feedIterator.ReadNextAsync());

    return list;
}

Just an FYI, I am able to get the replayId from cosmos db using the
above code but the handshake is not happening.

@kathariyasunny16
Copy link
Author

kathariyasunny16 commented Aug 19, 2023

Hi @kdcllc,

I debugged this and got to know that the issue is not with an async call to the cosmos repository to get the replayid.
I think the handshake is not happening when I am using the below code in Azure function startup to resolve the dependency of the cosmos repository and helper.

Code used in startup to resolve the dependency:

builder.Services.AddTransient<IOperatingParametersRepository>(s => new OperatingParametersRepository(
            new CosmosHelper<SalesForceReplayIdRunDetail>(s.GetService<CosmosClient>(),
                Settings.Cosmos.DatabaseName, Settings.Cosmos.OperatingParametersContainer,
                s.GetService<IFeedIteratorProvider>(), s.GetService<ISerializer>(),
                s.GetService<ILogger<CosmosHelper<SalesForceReplayIdRunDetail>>>())));

Cosmos Repository Constructor:

public OperatingParametersRepository(ICosmosHelper<SalesForceReplayIdRunDetail> cosmosHelper)
    {
        _cosmosHelper = cosmosHelper;
    }

Cosmos Helper Constructor:

public CosmosHelper(CosmosClient cosmosClient, string databaseName, string containerName,
        IFeedIteratorProvider feedIteratorProvider, ISerializer serializer, ILogger<CosmosHelper<T>> logger)
        : base(cosmosClient, containerName, serializer, logger)
    {
        _cosmosClient = cosmosClient;
        _databaseName = databaseName;
        _containerName = containerName;
        _feedIteratorProvider = feedIteratorProvider;
    }

@kathariyasunny16
Copy link
Author

@kdcllc @martin-podlubny @r-matias @jesbacon,
Any guidance on this?

Repository owner deleted a comment from kathariyasunny16 Aug 24, 2023
@kdcllc
Copy link
Owner

kdcllc commented Aug 24, 2023

Hi @kdcllc,

I debugged this and got to know that the issue is not with an async call to the cosmos repository to get the replayid. I think the handshake is not happening when I am using the below code in Azure function startup to resolve the dependency of the cosmos repository and helper.

Code used in startup to resolve the dependency:

builder.Services.AddTransient<IOperatingParametersRepository>(s => new OperatingParametersRepository(
            new CosmosHelper<SalesForceReplayIdRunDetail>(s.GetService<CosmosClient>(),
                Settings.Cosmos.DatabaseName, Settings.Cosmos.OperatingParametersContainer,
                s.GetService<IFeedIteratorProvider>(), s.GetService<ISerializer>(),
                s.GetService<ILogger<CosmosHelper<SalesForceReplayIdRunDetail>>>())));

Cosmos Repository Constructor:

public OperatingParametersRepository(ICosmosHelper<SalesForceReplayIdRunDetail> cosmosHelper)
    {
        _cosmosHelper = cosmosHelper;
    }

Cosmos Helper Constructor:

public CosmosHelper(CosmosClient cosmosClient, string databaseName, string containerName,
        IFeedIteratorProvider feedIteratorProvider, ISerializer serializer, ILogger<CosmosHelper<T>> logger)
        : base(cosmosClient, containerName, serializer, logger)
    {
        _cosmosClient = cosmosClient;
        _databaseName = databaseName;
        _containerName = containerName;
        _feedIteratorProvider = feedIteratorProvider;
    }

@kathariyasunny16
Does this code work as console application and not as azure function?

What operating system are you using?

@kathariyasunny16
Copy link
Author

@kdcllc
We are using Windows 10 and haven't tried the console app as we need to use this in the Azure function.

@kathariyasunny16
Copy link
Author

kathariyasunny16 commented Oct 17, 2023

Hello @kdcllc @martin-podlubny @r-matias ,
I have a quick question. How long the long-polling connection will be alive when I am subscribed to one channel? My subscriber is able to receive the events for 3 hours after subscribing and it automatically reconnects after 2 minutes for 3 hours. But it's not able to receive the events after 3 hours and instead of /cometd/54.0/connect request, it sends /cometd/54.0/handshake after 3 hours and then stops the subscription.
So, I just want to know whether I need to run my function which subscribes to the channel again after 3 hours. If yes, then when I try to run the function it's not able to make a successful handshake.
Please suggest some solution. Thanks in advance.

@kathariyasunny16
Copy link
Author

Hi @kdcllc @martin-podlubny @r-matias @jesbacon ,
Please reply to the above comment. I am stuck on this and need your support.

@jordanmcdonald-rh
Copy link

jordanmcdonald-rh commented Apr 22, 2024

@kathariyasunny16 when I use a lastReplayId or a -2 I get none of the missed responses. Is it working for you?

What does your SalesForceMessageListener class look like?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants