Skip to content

Commit

Permalink
refactor: Stability improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
divmgl committed Mar 6, 2025
1 parent bfc8357 commit de78763
Show file tree
Hide file tree
Showing 9 changed files with 1,387 additions and 1,180 deletions.
292 changes: 132 additions & 160 deletions dist/cjs/index.cjs

Large diffs are not rendered by default.

287 changes: 131 additions & 156 deletions dist/esm/index.js

Large diffs are not rendered by default.

324 changes: 11 additions & 313 deletions dist/types/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,307 +1,6 @@
import { Socket } from "net";
import EventEmitter from "events";
/**
* Handler for processing command responses
*/
export type CommandHandler<T> = (chunk: Uint8Array) => T | Promise<T>;
/**
* Command execution state
*/
export declare class CommandExecution<T> {
/** Handlers for processing command response */
handlers: CommandHandler<T | void>[];
/** Event emitter for command completion */
emitter: EventEmitter;
}
/**
* Options for putting a job into a tube
*/
export interface JackdPutOpts {
/** Priority value between 0 and 2**32. Jobs with smaller priority values will be scheduled before jobs with larger priorities. 0 is most urgent. */
priority?: number;
/** Number of seconds to wait before putting the job in the ready queue. Job will be in "delayed" state during this time. Maximum is 2**32-1. */
delay?: number;
/** Time to run - number of seconds to allow a worker to run this job. Minimum is 1. If 0 is provided, server will use 1. Maximum is 2**32-1. */
ttr?: number;
}
/**
* Raw job data returned from reserveRaw
*/
export interface JackdJobRaw {
/** Unique job ID for this instance of beanstalkd */
id: number;
/** Raw job payload as bytes */
payload: Uint8Array;
}
/**
* Job data with decoded string payload
*/
export interface JackdJob {
/** Unique job ID for this instance of beanstalkd */
id: number;
/** Job payload decoded as UTF-8 string */
payload: string;
}
/**
* Stats for a specific job
*/
export interface JobStats {
/** Job ID */
id: number;
/** Name of tube containing this job */
tube: string;
/** Current state of the job */
state: "ready" | "delayed" | "reserved" | "buried";
/** Priority value set by put/release/bury */
pri: number;
/** Time in seconds since job creation */
age: number;
/** Seconds remaining until job is put in ready queue */
delay: number;
/** Time to run in seconds */
ttr: number;
/** Seconds until server puts job into ready queue (only meaningful if reserved/delayed) */
timeLeft: number;
/** Binlog file number containing this job (0 if binlog disabled) */
file: number;
/** Number of times job has been reserved */
reserves: number;
/** Number of times job has timed out during reservation */
timeouts: number;
/** Number of times job has been released */
releases: number;
/** Number of times job has been buried */
buries: number;
/** Number of times job has been kicked */
kicks: number;
}
/**
* Stats for a specific tube
*/
export interface TubeStats {
/** Tube name */
name: string;
/** Number of ready jobs with priority < 1024 */
currentJobsUrgent: number;
/** Number of jobs in ready queue */
currentJobsReady: number;
/** Number of jobs reserved by all clients */
currentJobsReserved: number;
/** Number of delayed jobs */
currentJobsDelayed: number;
/** Number of buried jobs */
currentJobsBuried: number;
/** Total jobs created in this tube */
totalJobs: number;
/** Number of open connections using this tube */
currentUsing: number;
/** Number of connections waiting on reserve */
currentWaiting: number;
/** Number of connections watching this tube */
currentWatching: number;
/** Seconds tube is paused for */
pause: number;
/** Total delete commands for this tube */
cmdDelete: number;
/** Total pause-tube commands for this tube */
cmdPauseTube: number;
/** Seconds until tube is unpaused */
pauseTimeLeft: number;
}
/**
* System-wide statistics
*/
export interface SystemStats {
/** Number of ready jobs with priority < 1024 */
currentJobsUrgent: number;
/** Number of jobs in ready queue */
currentJobsReady: number;
/** Number of jobs reserved by all clients */
currentJobsReserved: number;
/** Number of delayed jobs */
currentJobsDelayed: number;
/** Number of buried jobs */
currentJobsBuried: number;
/** Total put commands */
cmdPut: number;
/** Total peek commands */
cmdPeek: number;
/** Total peek-ready commands */
cmdPeekReady: number;
/** Total peek-delayed commands */
cmdPeekDelayed: number;
/** Total peek-buried commands */
cmdPeekBuried: number;
/** Total reserve commands */
cmdReserve: number;
/** Total reserve-with-timeout commands */
cmdReserveWithTimeout: number;
/** Total touch commands */
cmdTouch: number;
/** Total use commands */
cmdUse: number;
/** Total watch commands */
cmdWatch: number;
/** Total ignore commands */
cmdIgnore: number;
/** Total delete commands */
cmdDelete: number;
/** Total release commands */
cmdRelease: number;
/** Total bury commands */
cmdBury: number;
/** Total kick commands */
cmdKick: number;
/** Total stats commands */
cmdStats: number;
/** Total stats-job commands */
cmdStatsJob: number;
/** Total stats-tube commands */
cmdStatsTube: number;
/** Total list-tubes commands */
cmdListTubes: number;
/** Total list-tube-used commands */
cmdListTubeUsed: number;
/** Total list-tubes-watched commands */
cmdListTubesWatched: number;
/** Total pause-tube commands */
cmdPauseTube: number;
/** Total job timeouts */
jobTimeouts: number;
/** Total jobs created */
totalJobs: number;
/** Maximum job size in bytes */
maxJobSize: number;
/** Number of currently existing tubes */
currentTubes: number;
/** Number of currently open connections */
currentConnections: number;
/** Number of open connections that have issued at least one put */
currentProducers: number;
/** Number of open connections that have issued at least one reserve */
currentWorkers: number;
/** Number of connections waiting on reserve */
currentWaiting: number;
/** Total connections */
totalConnections: number;
/** Process ID of server */
pid: number;
/** Version string of server */
version: string;
/** User CPU time of process */
rusageUtime: number;
/** System CPU time of process */
rusageStime: number;
/** Seconds since server started */
uptime: number;
/** Index of oldest binlog file needed */
binlogOldestIndex: number;
/** Index of current binlog file */
binlogCurrentIndex: number;
/** Maximum binlog file size */
binlogMaxSize: number;
/** Total records written to binlog */
binlogRecordsWritten: number;
/** Total records migrated in binlog */
binlogRecordsMigrated: number;
/** Whether server is in drain mode */
draining: boolean;
/** Random ID of server process */
id: string;
/** Server hostname */
hostname: string;
/** Server OS version */
os: string;
/** Server machine architecture */
platform: string;
}
/**
* Options for releasing a job back to ready queue
*/
interface JackdReleaseOpts {
/** New priority to assign to job */
priority?: number;
/** Seconds to wait before putting job in ready queue */
delay?: number;
}
/**
* Options for pausing a tube
*/
interface JackdPauseTubeOpts {
/** Seconds to pause the tube for */
delay?: number;
}
type JackdPutArgs = [
payload: Uint8Array | string | object,
options?: JackdPutOpts
];
type JackdReleaseArgs = [jobId: number, options?: JackdReleaseOpts];
type JackdPauseTubeArgs = [tubeId: string, options?: JackdPauseTubeOpts];
type JackdJobArgs = [jobId: number];
type JackdTubeArgs = [tubeId: string];
type JackdArgs = JackdPutArgs | JackdReleaseArgs | JackdPauseTubeArgs | JackdJobArgs | JackdTubeArgs | never[] | number[] | string[] | [jobId: number, priority?: number];
/**
* Client options
*/
export type JackdProps = {
/** Whether to automatically connect to the server */
autoconnect?: boolean;
/** Hostname of beanstalkd server */
host?: string;
/** Port number, defaults to 11300 */
port?: number;
/** Whether to automatically reconnect on connection loss */
autoReconnect?: boolean;
/** Initial delay in ms between reconnection attempts */
initialReconnectDelay?: number;
/** Maximum delay in ms between reconnection attempts */
maxReconnectDelay?: number;
/** Maximum number of reconnection attempts (0 for infinite) */
maxReconnectAttempts?: number;
};
/**
* Standardized error codes for Jackd operations
*/
export declare enum JackdErrorCode {
/** Server out of memory */
OUT_OF_MEMORY = "OUT_OF_MEMORY",
/** Internal server error */
INTERNAL_ERROR = "INTERNAL_ERROR",
/** Bad command format */
BAD_FORMAT = "BAD_FORMAT",
/** Unknown command */
UNKNOWN_COMMAND = "UNKNOWN_COMMAND",
/** Job body not properly terminated */
EXPECTED_CRLF = "EXPECTED_CRLF",
/** Job larger than max-job-size */
JOB_TOO_BIG = "JOB_TOO_BIG",
/** Server in drain mode */
DRAINING = "DRAINING",
/** Timeout exceeded with no job */
TIMED_OUT = "TIMED_OUT",
/** Reserved job TTR expiring */
DEADLINE_SOON = "DEADLINE_SOON",
/** Resource not found */
NOT_FOUND = "NOT_FOUND",
/** Cannot ignore only watched tube */
NOT_IGNORED = "NOT_IGNORED",
/** Unexpected server response */
INVALID_RESPONSE = "INVALID_RESPONSE",
/** Socket is not connected */
NOT_CONNECTED = "NOT_CONNECTED",
/** Fatal connection error */
FATAL_CONNECTION_ERROR = "FATAL_CONNECTION_ERROR"
}
/**
* Custom error class for Jackd operations
*/
export declare class JackdError extends Error {
/** Error code indicating the type of error */
code: JackdErrorCode;
/** Raw response from server if available */
response?: string;
constructor(code: JackdErrorCode, message?: string, response?: string);
}
import { type JackdProps } from "./types";
import type { CommandHandler, JackdArgs, JackdJob, JackdJobRaw, JackdPutOpts, JobStats, SystemStats, TubeStats } from "./types";
/**
* Beanstalkd client
*
Expand All @@ -323,7 +22,6 @@ export declare class JackdError extends Error {
export declare class JackdClient {
socket: Socket;
connected: boolean;
private buffer;
private chunkLength;
private host;
private port;
Expand All @@ -337,15 +35,14 @@ export declare class JackdClient {
private isReconnecting;
private watchedTubes;
private currentTube;
messages: Uint8Array[];
executions: CommandExecution<unknown>[];
private executions;
private buffer;
private commandBuffer;
private isProcessing;
constructor({ autoconnect, host, port, autoReconnect, initialReconnectDelay, maxReconnectDelay, maxReconnectAttempts }?: JackdProps);
private createSocket;
private handleDisconnect;
private setupSocketListeners;
private attemptReconnect;
processChunk(head: Uint8Array): Promise<void>;
flushExecutions(): Promise<void>;
/**
* For environments where network partitioning is common.
* @returns {Boolean}
Expand All @@ -361,10 +58,12 @@ export declare class JackdClient {
* Reuses the previously used tube after a reconnection
*/
private reuseTube;
write(buffer: Uint8Array): Promise<void>;
quit: () => Promise<void>;
close: () => Promise<void>;
disconnect: () => Promise<void>;
createCommandHandler<TArgs extends JackdArgs, TReturn>(commandStringFunction: (...args: TArgs) => Uint8Array, handlers: CommandHandler<TReturn | void>[]): (...args: TArgs) => Promise<TReturn>;
private processNextCommand;
private write;
/**
* Puts a job into the currently used tube
* @param payload Job data - will be JSON stringified if object
Expand Down Expand Up @@ -425,7 +124,7 @@ export declare class JackdClient {
* @throws {Error} BURIED if server out of memory
* @throws {Error} NOT_FOUND if job doesn't exist or not reserved by this client
*/
release: (jobId: number, options?: JackdReleaseOpts | undefined) => Promise<void>;
release: (jobId: number, options?: import("./types").JackdReleaseOpts | undefined) => Promise<void>;
/**
* Buries a job
* @param id Job ID to bury
Expand Down Expand Up @@ -458,7 +157,7 @@ export declare class JackdClient {
* @param delay Seconds to pause for
* @throws {Error} NOT_FOUND if tube doesn't exist
*/
pauseTube: (tubeId: string, options?: JackdPauseTubeOpts | undefined) => Promise<void>;
pauseTube: (tubeId: string, options?: import("./types").JackdPauseTubeOpts | undefined) => Promise<void>;
/**
* Peeks at a specific job
* @param id Job ID to peek at
Expand Down Expand Up @@ -531,6 +230,5 @@ export declare class JackdClient {
* @returns Name of tube being used
*/
listTubeUsed: () => Promise<string>;
createCommandHandler<TArgs extends JackdArgs, TReturn>(commandStringFunction: (...args: TArgs) => Uint8Array, handlers: CommandHandler<TReturn | void>[]): (...args: TArgs) => Promise<TReturn>;
}
export default JackdClient;
Loading

0 comments on commit de78763

Please sign in to comment.