Google Pub/Sub не срабатывает в заданное время.

Вопрос или проблема

В моем коде есть функция, которая проверяет ожидающие криптотранзакции и обновляет статус на основе определенной логики. Каждый раз, когда создается новая криптотранзакция, сообщение отправляется в топик pub/sub, чтобы сработать через 30 секунд и проверить статус этой транзакции. Проблема заключается в том, что таймер на 30 секунд игнорируется, и триггер срабатывает немедленно, не давая криптотранзакции шанс быть обработанной. Это пример кода, я использую функции облака Firebase и запускаю эту функцию для планирования темы, используя "@google-cloud/pubsub": "^4.5.0",:

  public async scheduleNextCheck({
    uid,
    operationID,
    pendingCryptoOperation,
  }: {
    uid: string;
    operationID: string;
    pendingCryptoOperation: Operation;
  }) {
    const pubsub = new PubSub();
    const data = {
      operationID,
      uid,
      "api-version": "v1",
      differenceInSeconds: 0,
    };
    const publishTime = this.calculateNextTriggerTimeStamp(
      pendingCryptoOperation
    );

    // Преобразование временной метки Firestore в миллисекунды
    const milliseconds =
      publishTime.seconds * 1000 +
      Math.floor(publishTime.nanoseconds / 1000000);

    // Создание объекта Date для времени публикации
    let publishDate = new Date(milliseconds);

    // Рассчитываем разницу между текущим временем и временем публикации
    const now = new Date();
    const differenceInMilliseconds = publishDate.getTime() - now.getTime();
    const differenceInSeconds = Math.floor(differenceInMilliseconds / 1000);
    data.differenceInSeconds = differenceInSeconds;

    // Убедитесь, что время публикации в будущем
    if (differenceInMilliseconds < 0) {
      console.warn("Время публикации в прошлом. Корректировка на текущее время.");
      publishDate = now; // или обработать этот случай должным образом
    }

    const messageId = await pubsub
      .topic(this.runtimeConfig.pubSubPendingOperationsTopic, {
        enableOpenTelemetryTracing: true,
      })
      .publishMessage({
        json: data,
        publishTime: Timestamp.fromDate(publishDate),
      });

    Logger.info(`Триггер запланирован для операции ${operationID}`, {
      operationID,
      status: OperationStatus.pending,
      uid,
      messageId,
      publishTime: publishDate.toISOString(),
      differenceInSeconds,
    });
  }

А это calculateNextTriggerTimeStamp просто для справки:

  public calculateNextTriggerTimeStamp(operation: Operation): Timestamp {
    const { createdAt } = operation;
    const now = Date.now();
    const fiveMinutesInMillis = 5 * 60 * 1000;
    const thirtySecondsInMillis = 30 * 1000;

    // Время, прошедшее с момента создания операции
    const timeElapsed = now - createdAt.toMillis();

    // Первые 5 минут: проверка каждые 30 секунд
    if (timeElapsed <= fiveMinutesInMillis) {
      return Timestamp.fromMillis(now + thirtySecondsInMillis);
    }

    // После 5 минут начинаем увеличивать интервал
    const intervals = [
      { duration: 10 * 60 * 1000, interval: 1 * 60 * 1000 }, // Следующие 10 минут: каждую минуту
      { duration: 30 * 60 * 1000, interval: 5 * 60 * 1000 }, // Следующие 30 минут: каждые 5 минут
      { duration: 2 * 60 * 60 * 1000, interval: 15 * 60 * 1000 }, // Следующие 2 часа: каждые 15 минут
      { duration: 6 * 60 * 60 * 1000, interval: 30 * 60 * 1000 }, // Следующие 6 часов: каждые 30 минут
      { duration: 34 * 60 * 60 * 1000, interval: 1 * 60 * 60 * 1000 }, // Остальное время до 48 часов: каждую 1 час
    ];

    let cumulativeTime = fiveMinutesInMillis;

    for (const step of intervals) {
      cumulativeTime += step.duration;
      if (timeElapsed <= cumulativeTime) {
        const remainingTimeInStep = cumulativeTime - timeElapsed;
        const nextTriggerInMillis =
          step.interval - (remainingTimeInStep % step.interval);
        return Timestamp.fromMillis(now + nextTriggerInMillis);
      }
    }

    throw ErrorMessages.TaskExceedsMaxInterval;
  }

Почему планирование не работает? Есть ли проблема в самом пакете?

Ответ или решение

Проблема с тем, что Google Pub/Sub не запускается в заданное время, может быть связана с несколькими факторами, касающимися вашей реализации. Давайте подробно рассмотрим возможные причины и предложим рекомендации по их устранению.

Проверка логики вычисления времени

  1. Логика вычисления publishTime:
    Ваш метод calculateNextTriggerTimeStamp правильно вычисляет следующее время для триггера. Однако вам необходимо убедиться, что время, возвращаемое из этого метода, действительно больше текущего времени. В противном случае вызывается предупреждение, и publishDate устанавливается в текущее время.

    if (differenceInMilliseconds < 0) {
       console.warn("Publish time is in the past. Adjusting to now.");
       publishDate = now; // or handle this case appropriately
    }

    Удостоверьтесь, что pendingCryptoOperation, передаваемый в scheduleNextCheck, имеет корректное значение createdAt. В противном случае условие может всегда возвращать ложное значение.

  2. Расчет разницы во времени:
    Проверьте, как ваше приложение получает и сохраняет время создания Operation. Убедитесь, что оно точно фиксирует значение времени, так как погрешности могут приводить к проблемам с расчетом временных меток.

Настройки Pub/Sub

  1. Параметры публикации:
    Когда вы вызываете publishMessage, сообщите, что вы передаете правильное время публикации. Убедитесь, что используете параметр publishTime корректно, и он не будет неожиданно изменять поведение триггера. Проверьте, поддерживает ли версия пакета @google-cloud/pubsub эти функции в вашем окружении.

Очередь сообщений и обработка

  1. Обработка сообщений:
    Если ваши функции облачного хранилища (Cloud Functions) настроены на обработку сообщений Pub/Sub немедленно, вам следует рассмотреть возможность настройки триггеров, которые могут ожидать обработки сообщения. Проверьте настройки вашей функции, которая обрабатывает сообщения Pub/Sub, и убедитесь, что она ожидает правильные условия, прежде чем инициировать проверку.

  2. Проверка состояний:
    Убедитесь, что ваша функция, вызываемая при запуске триггера, корректно работает с состоянием криптовалютных операций. Если для проверки статуса требуется больше времени, чем ожидалось, может быть целесообразно включить дополнительную задержку или обработку состояния.

Альтернативные подходы

  1. Использование отложенных сообщений:
    Рассмотрите возможность использования отложенных сообщений (например, с помощью Cloud Tasks), чтобы обработка происходила через установленный вами таймер. Это может добавить степень надежности к реализации.

  2. Логи и мониторинг:
    Внедрите логирование (например, с использованием Stackdriver Logging) для анализа поведения вашей функции и отслеживания времени создания и обработки сообщений. Это поможет вам понять, где именно может возникать проблема.

Заключение

Существуют различные аспекты, которые могут влиять на то, почему Google Pub/Sub не запускается в установленное время. Проведя детальный анализ вашей реализации, начиная от вычислений времени до обработки и публикации сообщений, вы сможете устранить проблему и повысить надежность своей системы. Не забывайте о важности периодического тестирования и мониторинга для обеспечения стабильной работы вашего приложения.

Оцените материал
Добавить комментарий

Капча загружается...