1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
|
#!/usr/bin/env python3
"""
File Monitor Script
Monitors a directory for new files and transfers them via SCP.
Handles retries and error cases.
"""
import os
import sys
import time
import shutil
import logging
import argparse
import subprocess
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from threading import Timer
import configparser
class FileTransferHandler(FileSystemEventHandler):
def __init__(self, config):
self.config = config
self.watch_dir = Path(config['watch_directory'])
self.sent_dir = self.watch_dir / '.sent'
self.error_dir = self.watch_dir / '.error'
self.processing_files = set()
# Create directories if they don't exist
self.sent_dir.mkdir(exist_ok=True)
self.error_dir.mkdir(exist_ok=True)
# Setup logging
self.setup_logging()
def setup_logging(self):
log_level = getattr(logging, self.config.get('log_level', 'INFO').upper())
log_format = '%(asctime)s - %(levelname)s - %(message)s'
if self.config.get('log_file'):
logging.basicConfig(
level=log_level,
format=log_format,
handlers=[
logging.FileHandler(self.config['log_file']),
logging.StreamHandler(sys.stdout)
]
)
else:
logging.basicConfig(level=log_level, format=log_format)
self.logger = logging.getLogger(__name__)
def on_created(self, event):
if event.is_directory:
return
file_path = Path(event.src_path)
# Skip hidden files and our own directories
if file_path.name.startswith('.'):
return
self.logger.info(f"New file detected: {file_path}")
# Wait a bit to ensure file is completely written
delay = float(self.config.get('file_settle_delay', '2.0'))
Timer(delay, self.process_file, args=[file_path]).start()
def process_file(self, file_path):
"""Process a single file for transfer"""
if not file_path.exists():
self.logger.warning(f"File no longer exists: {file_path}")
return
if str(file_path) in self.processing_files:
self.logger.debug(f"File already being processed: {file_path}")
return
self.processing_files.add(str(file_path))
try:
# Check if file is still being written to
while not self.is_file_stable(file_path):
self.logger.debug(f"File still being written, retrying later: {file_path}")
# Timer(2.0, self.process_file, args=[file_path]).start()
time.sleep(1)
self.transfer_file_with_retry(file_path)
finally:
self.processing_files.discard(str(file_path))
def is_file_stable(self, file_path, check_interval=1.0):
"""Check if file size is stable (not being written to)"""
try:
size1 = file_path.stat().st_size
time.sleep(check_interval)
size2 = file_path.stat().st_size
return size1 == size2
except OSError:
return False
def transfer_file_with_retry(self, file_path):
"""Transfer file with retry logic"""
max_retries = int(self.config.get('max_retries', '3'))
retry_delay = float(self.config.get('retry_delay', '30.0'))
for attempt in range(max_retries):
try:
self.logger.info(f"Transferring {file_path} (attempt {attempt + 1}/{max_retries})")
if self.scp_transfer(file_path):
self.move_to_sent(file_path)
self.logger.info(f"Successfully transferred and archived: {file_path}")
return
else:
raise Exception("SCP transfer failed")
except Exception as e:
self.logger.error(f"Transfer attempt {attempt + 1} failed for {file_path}: {e}")
if attempt < max_retries - 1:
self.logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
self.logger.error(f"All transfer attempts failed for {file_path}, moving to error folder")
self.move_to_error(file_path)
def scp_transfer(self, file_path):
"""Perform SCP transfer"""
try:
# Build SCP command
remote_host = self.config['remote_host']
remote_path = self.config.get('remote_path', '.')
ssh_key = self.config.get('ssh_key')
ssh_user = self.config.get('ssh_user', 'root')
ssh_port = self.config.get('ssh_port', '22')
# Remote destination
if remote_path.endswith('/'):
remote_dest = f"{ssh_user}@{remote_host}:{remote_path}{file_path.name}"
else:
remote_dest = f"{ssh_user}@{remote_host}:{remote_path}/{file_path.name}"
# Build command
cmd = ['scp']
# Add SSH options
ssh_options = [
'-o', 'BatchMode=yes',
'-o', 'StrictHostKeyChecking=no',
'-o', f'ConnectTimeout={self.config.get("connect_timeout", "30")}',
'-P', ssh_port
]
if ssh_key:
ssh_options.extend(['-i', ssh_key])
cmd.extend(ssh_options)
cmd.extend([str(file_path), remote_dest])
self.logger.debug(f"Running command: {' '.join(cmd)}")
# Execute SCP
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=int(self.config.get('transfer_timeout', '300'))
)
if result.returncode == 0:
self.logger.debug(f"SCP transfer successful for {file_path}")
return True
else:
self.logger.error(f"SCP failed with return code {result.returncode}")
self.logger.error(f"STDERR: {result.stderr}")
return False
except subprocess.TimeoutExpired:
self.logger.error(f"SCP transfer timed out for {file_path}")
return False
except Exception as e:
self.logger.error(f"SCP transfer error for {file_path}: {e}")
return False
def move_to_sent(self, file_path):
"""Move file to sent directory"""
try:
dest_path = self.sent_dir / file_path.name
# Handle filename conflicts
counter = 1
while dest_path.exists():
name_parts = file_path.stem, counter, file_path.suffix
dest_path = self.sent_dir / f"{name_parts[0]}_{name_parts[1]}{name_parts[2]}"
counter += 1
shutil.move(str(file_path), str(dest_path))
self.logger.info(f"File archived to: {dest_path}")
except Exception as e:
self.logger.error(f"Failed to move file to sent directory: {e}")
def move_to_error(self, file_path):
"""Move file to error directory"""
try:
dest_path = self.error_dir / file_path.name
# Handle filename conflicts
counter = 1
while dest_path.exists():
name_parts = file_path.stem, counter, file_path.suffix
dest_path = self.error_dir / f"{name_parts[0]}_{name_parts[1]}{name_parts[2]}"
counter += 1
shutil.move(str(file_path), str(dest_path))
self.logger.info(f"File moved to error directory: {dest_path}")
except Exception as e:
self.logger.error(f"Failed to move file to error directory: {e}")
def load_config(config_file):
"""Load configuration from file"""
config = configparser.ConfigParser()
config.read(config_file)
if 'transfer' not in config:
raise ValueError("Configuration file must contain a [transfer] section")
return dict(config['transfer'])
def create_sample_config(config_file):
"""Create a sample configuration file"""
config = configparser.ConfigParser()
config['transfer'] = {
'watch_directory': '/path/to/watch/directory',
'remote_host': 'example.com',
'remote_path': '/remote/destination/path',
'ssh_user': 'username',
'ssh_key': '/path/to/ssh/private/key',
'ssh_port': '22',
'max_retries': '3',
'retry_delay': '30.0',
'file_settle_delay': '2.0',
'connect_timeout': '30',
'transfer_timeout': '300',
'log_level': 'INFO',
'log_file': '/path/to/logfile.log'
}
with open(config_file, 'w') as f:
config.write(f)
print(f"Sample configuration created at: {config_file}")
print("Please edit the configuration file with your settings before running the script.")
def main():
parser = argparse.ArgumentParser(description='Monitor directory and transfer files via SCP')
parser.add_argument('-c', '--config', required=True, help='Configuration file path')
parser.add_argument('--create-config', action='store_true',
help='Create a sample configuration file')
args = parser.parse_args()
if args.create_config:
create_sample_config(args.config)
return
if not os.path.exists(args.config):
print(f"Configuration file not found: {args.config}")
print(f"Use --create-config to create a sample configuration file")
sys.exit(1)
try:
config = load_config(args.config)
# Validate required settings
required_settings = ['watch_directory', 'remote_host']
for setting in required_settings:
if setting not in config:
print(f"Required setting missing from config: {setting}")
sys.exit(1)
watch_dir = Path(config['watch_directory'])
if not watch_dir.exists():
print(f"Watch directory does not exist: {watch_dir}")
sys.exit(1)
# Create handler and observer
handler = FileTransferHandler(config)
observer = Observer()
observer.schedule(handler, str(watch_dir), recursive=False)
# Start monitoring
observer.start()
handler.logger.info(f"Started monitoring directory: {watch_dir}")
handler.logger.info(f"Transferring to: {config['remote_host']}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
handler.logger.info("Stopping file monitor...")
observer.stop()
observer.join()
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
|